diff --git a/tskv/src/reader/iterator.rs b/tskv/src/reader/iterator.rs index d29b8536b5..fe8b227c63 100644 --- a/tskv/src/reader/iterator.rs +++ b/tskv/src/reader/iterator.rs @@ -1,3 +1,6 @@ +use std::cmp::{Ordering, Reverse}; +use std::collections::BinaryHeap; +use std::mem::MaybeUninit; use std::sync::Arc; use datafusion::arrow::array::{ @@ -11,6 +14,7 @@ use datafusion::arrow::datatypes::{ }; use datafusion::arrow::record_batch::RecordBatch; use datafusion::physical_plan::metrics::{self, ExecutionPlanMetricsSet, MetricBuilder}; +use futures::future::join_all; use minivec::MiniVec; use models::meta_data::VnodeId; use models::predicate::domain::{self, QueryArgs, QueryExpr, TimeRanges}; @@ -29,8 +33,8 @@ use crate::compute::count::count_column_non_null_values; use crate::error::Result; use crate::memcache::DataType; use crate::reader::Cursor; -use crate::tseries_family::SuperVersion; -use crate::tsm::{BlockMetaIterator, DataBlock, TsmReader}; +use crate::tseries_family::{ColumnFile, SuperVersion, Version}; +use crate::tsm::{BlockMetaIterator, DataBlockReader, TsmReader}; use crate::{EngineRef, Error}; pub type CursorPtr = Box; @@ -396,13 +400,173 @@ pub struct FieldFileLocation { block_meta_iter: BlockMetaIterator, time_ranges: Arc, - data_block: DataBlock, - /// The first index of a DataType in a DataBlock - data_block_i: usize, - /// The last index of a DataType in a DataBlock - data_block_i_end: usize, - intersected_time_ranges: TimeRanges, - intersected_time_ranges_i: usize, + data_block_reader: DataBlockReader, +} + +struct DataTypeWithFileId { + file_id: u64, + data_type: DataType, +} +impl DataTypeWithFileId { + pub fn new(data_type: DataType, file_id: u64) -> Self { + Self { file_id, data_type } + } + pub fn take(self) -> DataType { + self.data_type + } +} + +impl Eq for DataTypeWithFileId {} + +impl PartialEq for DataTypeWithFileId { + fn eq(&self, other: &Self) -> bool { + self.data_type.eq(&other.data_type) && self.file_id.eq(&other.file_id) + } +} + +impl PartialOrd for DataTypeWithFileId { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for DataTypeWithFileId { + fn cmp(&self, other: &Self) -> Ordering { + match self.data_type.cmp(&other.data_type) { + Ordering::Equal => self.file_id.cmp(&other.file_id), + other => other, + } + } +} + +pub struct LevelTSDataStream { + field_file_locations: Vec + Send + 'static>>, + peeked_file_locations: Vec>, + // + data_heap: BinaryHeap>, + cached_data_type: Option, +} + +unsafe impl Send for LevelTSDataStream {} +unsafe impl Sync for LevelTSDataStream {} + +async fn open_field_file_location( + column_file: Arc, + version: Arc, + time_ranges: Arc, + field_id: FieldId, + value_type: ValueType, +) -> Result> { + let tsm_reader = version.get_tsm_reader(column_file.file_path()).await?; + let res = tsm_reader + .index_iterator_opt(field_id) + .map(move |index_meta| { + FieldFileLocation::new( + tsm_reader.clone(), + time_ranges.clone(), + index_meta.block_iterator_opt(time_ranges.clone()), + value_type, + ) + }) + .collect(); + Ok(res) +} + +impl LevelTSDataStream { + pub async fn new( + version: Arc, + time_ranges: Arc, + column_files: Vec>, + field_id: FieldId, + value_type: ValueType, + ) -> Result { + let mut field_file_locations: Vec< + Box + 'static + Send>, + > = vec![]; + let locations_future = column_files.into_iter().map(|f| { + open_field_file_location( + f, + version.clone(), + time_ranges.clone(), + field_id, + value_type, + ) + }); + for ls in join_all(locations_future) + .await + .into_iter() + .collect::>>()? + .into_iter() + { + field_file_locations.push(Box::new(ls.into_iter())) + } + let peeked_file_locations = field_file_locations.iter().map(|_| None).collect(); + + Ok(Self { + field_file_locations, + peeked_file_locations, + data_heap: BinaryHeap::new(), + cached_data_type: None, + }) + } + + async fn next_data(&mut self) -> Result> { + let mut has_finished = false; + for (peeked_location, files_location) in self + .peeked_file_locations + .iter_mut() + .zip(self.field_file_locations.iter_mut()) + { + if peeked_location.is_none() { + *peeked_location = files_location.next(); + } + + loop { + if let Some(location) = peeked_location { + match location.next_data().await? { + None => { + *peeked_location = files_location.next(); + } + Some(data) => { + let data = DataTypeWithFileId::new(data, location.get_file_id()); + self.data_heap.push(Reverse(data)); + break; + } + } + } else { + has_finished = true; + break; + } + } + } + + // clean finished file_location_iterator + if has_finished { + // SAFETY + unsafe { + let mut un_finish_iter = self.peeked_file_locations.iter().map(|a| a.is_some()); + debug_assert!(un_finish_iter.len().eq(&self.field_file_locations.len())); + self.field_file_locations + .retain(|_| un_finish_iter.next().unwrap_unchecked()); + } + self.peeked_file_locations.retain(|e| e.is_some()); + } + + loop { + return match self.data_heap.pop() { + Some(Reverse(data)) => { + if let Some(Reverse(next_data)) = self.data_heap.peek() { + // deduplication + if data.data_type.eq(&next_data.data_type) { + continue; + } + } + Ok(Some(data.take())) + } + None => Ok(None), + }; + } + } } impl FieldFileLocation { @@ -416,42 +580,31 @@ impl FieldFileLocation { reader, block_meta_iter, time_ranges, - // TODO: can here use unsafe api MaybeUninit ? - data_block: DataBlock::new(0, vtype), - // Let data block index > end index when init to make it load from reader - // for the first time to `peek()`. - data_block_i: 1, - data_block_i_end: 0, - intersected_time_ranges: TimeRanges::empty(), - intersected_time_ranges_i: 0, + data_block_reader: DataBlockReader::new_uninit(vtype), } } - pub async fn peek(&mut self) -> Result> { - // Check if we need to init. - if self.data_block_i > self.data_block_i_end - && !self.next_intersected_index_range() - && !self.next_data_block().await? - { - return Ok(None); + // if return None + pub async fn next_data(&mut self) -> Result> { + let res = self.data_block_reader.next(); + if res.is_some() { + return Ok(res); } - - Ok(self.data_block.get(self.data_block_i)) - } - - pub fn next(&mut self) { - self.data_block_i += 1; + if let Some(reader) = self.next_data_block_reader().await? { + self.data_block_reader = reader; + debug_assert!(self.data_block_reader.has_next()); + return Ok(self.data_block_reader.next()); + } + Ok(None) } /// Iterates the ramaining BlockMeta in `block_meta_iter`, if there are no remaining BlockMeta's, /// then return Ok(false). /// /// Iteration will continue until there are intersected time range between DataBlock and `time_ranges`. - async fn next_data_block(&mut self) -> Result { - let mut has_next_block = false; - + async fn next_data_block_reader(&mut self) -> Result> { // Get next BlockMeta to locate the next DataBlock from file. - while let Some(meta) = self.block_meta_iter.next() { + for meta in self.block_meta_iter.by_ref() { if meta.count() == 0 { continue; } @@ -459,48 +612,19 @@ impl FieldFileLocation { // Check if the time range of the BlockMeta intersected with the given time ranges. if let Some(intersected_tr) = self.time_ranges.intersect(&time_range) { // Load a DataBlock from reader by BlockMeta. - self.data_block = self.reader.get_data_block(&meta).await?; - self.intersected_time_ranges = intersected_tr; - self.intersected_time_ranges_i = 0; - if self.next_intersected_index_range() { - // Found next DataBlock and range to iterate. - has_next_block = true; - break; + let block = self.reader.get_data_block(&meta).await?; + let mut data_block_reader = DataBlockReader::new(block, intersected_tr); + if data_block_reader.has_next() { + return Ok(Some(data_block_reader)); } } } - Ok(has_next_block) + Ok(None) } - /// Iterates the ramaining TimeRange in `intersected_time_ranges`, if there are no remaning TimeRange's. - /// then return false. - /// - /// If there are overlaped time range of DataBlock and TimeRanges, set iteration range of `data_block` - /// and return true, otherwise set the iteration range a zero-length range `[1, 0]` and return false. - /// - /// **Note**: Call of this method should be arranged after the call of method `next_data_block`. - fn next_intersected_index_range(&mut self) -> bool { - self.data_block_i = 1; - self.data_block_i_end = 0; - if self.intersected_time_ranges.is_empty() - || self.intersected_time_ranges_i >= self.intersected_time_ranges.len() - { - false - } else { - let tr_idx_start = self.intersected_time_ranges_i; - for tr in self.intersected_time_ranges.time_ranges()[tr_idx_start..].iter() { - self.intersected_time_ranges_i += 1; - // Check if the DataBlock matches one of the intersected time ranges. - // TODO: sometimes the comparison in loop can stop earily. - if let Some((min, max)) = self.data_block.index_range(tr) { - self.data_block_i = min; - self.data_block_i_end = max; - return true; - } - } - false - } + pub fn get_file_id(&self) -> u64 { + self.reader.file_id() } } @@ -509,11 +633,7 @@ impl std::fmt::Debug for FieldFileLocation { f.debug_struct("FieldFileLocation") .field("file_id", &self.reader.file_id()) .field("time_ranges", &self.time_ranges) - .field("data_block_range", &self.data_block.time_range()) - .field("data_block_i", &self.data_block_i) - .field("data_block_i_end", &self.data_block_i_end) - .field("intersected_time_ranges", &self.intersected_time_ranges) - .field("intersected_time_ranges_i", &self.intersected_time_ranges_i) + .field("data_block_reader", &self.data_block_reader) .finish() } } @@ -541,9 +661,7 @@ impl Cursor for TimeCursor { ColumnType::Time(self.unit.clone()) } - async fn next(&mut self, _ts: i64) {} - - async fn peek(&mut self) -> Result> { + async fn next(&mut self) -> Result> { let data = DataType::I64(self.ts, self.ts); Ok(Some(data)) @@ -575,9 +693,7 @@ impl Cursor for TagCursor { ColumnType::Tag } - async fn next(&mut self, _ts: i64) {} - - async fn peek(&mut self) -> Result> { + async fn next(&mut self) -> Result> { Ok(self.value.clone()) } } @@ -587,39 +703,80 @@ pub struct FieldCursor { name: Arc, value_type: ValueType, - cache_index: usize, - cache_data: Vec, - locations: Vec, + cache_data: Box + 'static>, + peeked_cache: Option, + level_data_stream: [Option; 5], + + peeked_l0: Option, + peeked_l14: Option, } +unsafe impl Sync for FieldCursor {} +unsafe impl Send for FieldCursor {} + impl FieldCursor { pub fn empty(value_type: ValueType, name: Arc) -> Self { + let mut level_data_stream_uninit = MaybeUninit::uninit_array(); + level_data_stream_uninit.iter_mut().for_each(|u| { + u.write(None); + }); + Self { + name, + value_type, + cache_data: Box::new(std::iter::empty()), + peeked_cache: None, + peeked_l0: None, + peeked_l14: None, + level_data_stream: unsafe { MaybeUninit::array_assume_init(level_data_stream_uninit) }, + } + } + + pub fn new( + name: Arc, + value_type: ValueType, + cache_data: Box + 'static>, + level_data_stream: [Option; 5], + ) -> Self { Self { name, value_type, - cache_index: 0, - cache_data: Vec::new(), - locations: Vec::new(), + cache_data, + peeked_cache: None, + peeked_l0: None, + peeked_l14: None, + level_data_stream, } } - fn peek_cache(&mut self) -> Option<&DataType> { - let mut opt_top = self.cache_data.get(self.cache_index); - let mut opt_next = self.cache_data.get(self.cache_index + 1); + async fn next_l0_data(&mut self) -> Result> { + match self.level_data_stream[0].as_mut() { + None => Ok(None), + Some(stream) => match stream.next_data().await? { + None => { + self.level_data_stream[0].as_mut().take(); + Ok(None) + } + other => Ok(other), + }, + } + } - while let (Some(top), Some(next)) = (opt_top, opt_next) { - // if timestamp is same, select next data - // deduplication - if top.timestamp() == next.timestamp() { - self.cache_index += 1; - opt_top = Some(next); - opt_next = self.cache_data.get(self.cache_index + 1); - } else { - break; + async fn next_l14_data(&mut self) -> Result> { + for stream_opt in self.level_data_stream.iter_mut().skip(1).rev() { + match stream_opt { + None => { + continue; + } + Some(stream) => match stream.next_data().await? { + None => { + *stream_opt = None; + continue; + } + other => return Ok(other), + }, } } - - opt_top + Ok(None) } } @@ -629,47 +786,54 @@ impl Cursor for FieldCursor { &self.name } - async fn peek(&mut self) -> Result> { - let mut data = DataType::new(self.value_type, i64::MAX); - for loc in self.locations.iter_mut() { - if let Some(val) = loc.peek().await? { - if data.timestamp() >= val.timestamp() { - data = val; - } - } - } + fn column_type(&self) -> ColumnType { + ColumnType::Field(self.value_type) + } - if let Some(val) = self.peek_cache() { - if data.timestamp() >= val.timestamp() { - data = val.clone(); - } + async fn next(&mut self) -> Result> { + if self.peeked_cache.is_none() { + self.peeked_cache = self.cache_data.next(); } - if data.timestamp() == i64::MAX { - return Ok(None); + if self.peeked_l0.is_none() { + self.peeked_l0 = self.next_l0_data().await?; } - Ok(Some(data)) - } - async fn next(&mut self, ts: i64) { - if let Some(val) = self.peek_cache() { - if val.timestamp() == ts { - self.cache_index += 1; - } + if self.peeked_l14.is_none() { + self.peeked_l14 = self.next_l14_data().await?; } - for loc in self.locations.iter_mut() { - if let Some(val) = loc.peek().await.unwrap() { - if val.timestamp() == ts { - loc.next(); + let peeked_file_data = match (&self.peeked_l0, &self.peeked_l14) { + (Some(l0), Some(l14)) => match l0.timestamp().cmp(&l14.timestamp()) { + Ordering::Less => Some(&mut self.peeked_l0), + Ordering::Equal => { + self.peeked_l14.take(); + Some(&mut self.peeked_l0) } - } + Ordering::Greater => Some(&mut self.peeked_l14), + }, + (Some(_), None) => Some(&mut self.peeked_l0), + (None, Some(_)) => Some(&mut self.peeked_l14), + (None, None) => None, + }; + let peeked_cache_data = &mut self.peeked_cache; + match (peeked_file_data, peeked_cache_data.as_ref()) { + (Some(file_data_opt), Some(cache_data)) => match file_data_opt { + None => Ok(peeked_cache_data.take()), + Some(file_data) => match file_data.timestamp().cmp(&cache_data.timestamp()) { + Ordering::Less => Ok(file_data_opt.take()), + Ordering::Equal => { + file_data_opt.take(); + Ok(peeked_cache_data.take()) + } + Ordering::Greater => Ok(peeked_cache_data.take()), + }, + }, + (Some(res), None) => Ok(res.take()), + (None, Some(_)) => Ok(peeked_cache_data.take()), + (None, None) => Ok(None), } } - - fn column_type(&self) -> ColumnType { - ColumnType::Field(self.value_type) - } } pub struct RowIterator { @@ -807,6 +971,8 @@ impl RowIterator { end: usize, sender: Sender>>, ) { + let row_cols: Vec> = + vec![None; self.query_option.table_schema.columns().len()]; let mut iter = SeriesGroupRowIterator { runtime: self.runtime.clone(), engine: self.engine.clone(), @@ -824,6 +990,7 @@ impl RowIterator { .span_recorder .child(format!("SeriesGroupRowIterator [{}, {})", start, end)), metrics: SeriesGroupRowIteratorMetrics::new(&self.metrics_set, start), + row_cols, }; let can_tok = self.series_iter_closer.clone(); self.runtime.spawn(async move { @@ -1010,6 +1177,8 @@ struct SeriesGroupRowIterator { #[allow(unused)] span_recorder: SpanRecorder, metrics: SeriesGroupRowIteratorMetrics, + // row_cols_cache + row_cols: Vec>, } impl SeriesGroupRowIterator { @@ -1156,6 +1325,33 @@ impl SeriesGroupRowIterator { Ok(()) } + async fn build_level_ts_stream( + &self, + version: Arc, + time_ranges: Arc, + field_id: FieldId, + value_type: ValueType, + ) -> Result<[Option; 5]> { + let mut res_uninit = MaybeUninit::uninit_array(); + let level_files = version.get_level_files(&time_ranges, field_id); + for (res, level_file) in res_uninit.iter_mut().zip(level_files) { + if let Some(files) = level_file { + let stream = LevelTSDataStream::new( + version.clone(), + time_ranges.clone(), + files, + field_id, + value_type, + ) + .await?; + res.write(Some(stream)); + } else { + res.write(None); + } + } + Ok(unsafe { MaybeUninit::array_assume_init(res_uninit) }) + } + /// Build a FieldCursor with cached data and file locations. async fn build_field_cursor( &self, @@ -1171,9 +1367,6 @@ impl SeriesGroupRowIterator { let time_ranges_ref = self.query_option.split.time_ranges(); let time_predicate = |ts| time_ranges_ref.is_boundless() || time_ranges_ref.contains(ts); debug!("Pushed down time range filter: {:?}", time_ranges_ref); - - let timer = self.metrics.elapsed_get_data_from_memcache().timer(); - // Get data from im_memcache and memcache let mut cache_data: Vec = Vec::new(); super_version.caches.read_field_data( @@ -1182,90 +1375,64 @@ impl SeriesGroupRowIterator { |_| true, |d| cache_data.push(d), ); - cache_data.sort_by_key(|data| data.timestamp()); - timer.done(); + cache_data.sort_by_key(|data| data.timestamp()); + cache_data.reverse(); + cache_data.dedup_by_key(|data| data.timestamp()); debug!( "build memcache data id: {:02X}, len: {}", field_id, cache_data.len() ); + let cache_data_iter = cache_data.into_iter().rev(); + + let level_ts_stream = self + .build_level_ts_stream( + super_version.version.clone(), + time_ranges_ref.clone(), + field_id, + field_type, + ) + .await?; + let cursor = FieldCursor::new( + field_name.clone(), + field_type, + Box::new(cache_data_iter), + level_ts_stream, + ); - let timer = self.metrics.elapsed_get_field_location().timer(); - - // Get data from level info, find time range overlapped files and file locations. - // TODO: Init locations in parallel with other fields. - let mut locations = vec![]; - for level in super_version.version.levels_info.iter().rev() { - if !time_ranges_ref.overlaps(&level.time_range) { - continue; - } - for file in level.files.iter() { - if !time_ranges_ref.overlaps(file.time_range()) || !file.contains_field_id(field_id) - { - continue; - } - let path = file.file_path(); - debug!( - "Building FieldCursor: field: {:02X}, path: '{}'", - field_id, - path.display() - ); - - if !path.is_file() { - return Err(Error::TsmFileBroken { - source: crate::tsm::ReadTsmError::FileNotFound { - reason: format!("File Not Found: {}", path.display()), - }, - }); - } - - let tsm_reader = super_version.version.get_tsm_reader(path).await?; - for idx_meta in tsm_reader.index_iterator_opt(field_id) { - let location = FieldFileLocation::new( - tsm_reader.clone(), - time_ranges_ref.clone(), - idx_meta.block_iterator_opt(time_ranges_ref.clone()), - field_type, - ); - locations.push(location); - } - } - } - debug!("Building FieldCursor: locations: {:?}", &locations); - timer.done(); - - Ok(FieldCursor { - name: field_name, - value_type: field_type, - cache_index: 0, - cache_data, - locations, - }) + Ok(cursor) } async fn collect_row_data(&mut self, builders: &mut [ArrayBuilderPtr]) -> Result> { trace::trace!("======collect_row_data========="); let mut min_time = i64::MAX; - let mut row_cols = Vec::with_capacity(self.columns.len()); + // For each column, peek next (timestamp, value), set column_values, and // specify the next min_time (if column is a `Field`). - for col_cursor in self.columns.iter_mut() { - let ts_val = col_cursor.peek().await?; - if let Some(ref d) = ts_val { + for (col_cursor, row_col) in self.columns.iter_mut().zip(self.row_cols.iter_mut()) { + if !col_cursor.is_field() || (col_cursor.is_field() && row_col.is_none()) { + *row_col = col_cursor.next().await?; + } + if let Some(ref d) = row_col { if col_cursor.is_field() { min_time = min_num(min_time, d.timestamp()); } } - row_cols.push(ts_val) } // For the specified min_time, fill each column data. - // If a column data is for later time, just set it None. + // If a column data is for later time, set min_time_column_flag. + let mut min_time_column_flag = vec![false; self.columns.len()]; let mut test_collected_col_num = 0_usize; - for (col_cursor, ts_val) in self.columns.iter_mut().zip(row_cols.iter_mut()) { + for ((col_cursor, ts_val), min_flag) in self + .columns + .iter_mut() + .zip(self.row_cols.iter_mut()) + .zip(min_time_column_flag.iter_mut()) + { trace::trace!("field: {}, value: {:?}", col_cursor.name(), ts_val); if !col_cursor.is_field() { continue; @@ -1275,34 +1442,50 @@ impl SeriesGroupRowIterator { let ts = d.timestamp(); if ts == min_time { test_collected_col_num += 1; - col_cursor.next(ts).await; - } else { - *ts_val = None; + *min_flag = true } } } // Step field_scan completed. trace::trace!( - "Collected data, series_id: {}, column count: {test_collected_col_num}, timestamp: {min_time}", - self.series_ids[self.i - 1], - ); + "Collected data, series_id: {}, column count: {test_collected_col_num}, timestamp: {min_time}", + self.series_ids[self.i - 1], + ); + if min_time == i64::MAX { // If peeked no data, return. self.columns.clear(); return Ok(None); } - for (i, value) in row_cols.into_iter().enumerate() { + for (i, (value, min_flag)) in self + .row_cols + .iter_mut() + .zip(min_time_column_flag.iter_mut()) + .enumerate() + { match self.columns[i].column_type() { ColumnType::Time(unit) => { builders[i].append_timestamp(&unit, min_time); } ColumnType::Tag => { - builders[i].append_value(ValueType::String, value, self.columns[i].name())?; + builders[i].append_value( + ValueType::String, + value.take(), + self.columns[i].name(), + )?; } ColumnType::Field(value_type) => { - builders[i].append_value(value_type, value, self.columns[i].name())?; + if *min_flag { + builders[i].append_value( + value_type, + value.take(), + self.columns[i].name(), + )?; + } else { + builders[i].append_value(value_type, None, self.columns[i].name())?; + } } } } diff --git a/tskv/src/reader/mod.rs b/tskv/src/reader/mod.rs index b2361b7be6..62c76fd49e 100644 --- a/tskv/src/reader/mod.rs +++ b/tskv/src/reader/mod.rs @@ -24,6 +24,5 @@ pub trait Cursor: Send + Sync { matches!(self.column_type(), PhysicalCType::Field(_)) } fn column_type(&self) -> PhysicalCType; - async fn next(&mut self, ts: i64); - async fn peek(&mut self) -> Result, Error>; + async fn next(&mut self) -> Result, Error>; } diff --git a/tskv/src/tsm/block.rs b/tskv/src/tsm/block.rs index c3bfb89ad4..9a9734ba7d 100644 --- a/tskv/src/tsm/block.rs +++ b/tskv/src/tsm/block.rs @@ -1,9 +1,9 @@ use std::cmp::min; use std::error::Error; -use std::fmt::{Display, Formatter}; +use std::fmt::{Debug, Display, Formatter}; use minivec::MiniVec; -use models::predicate::domain::TimeRange; +use models::predicate::domain::{TimeRange, TimeRanges}; use models::{PhysicalDType as ValueType, Timestamp}; use trace::error; @@ -836,6 +836,116 @@ fn exclude_slow(v: &mut Vec>, min_idx: usize, max_idx: usize) { v.truncate(len); } +pub struct DataBlockReader { + data_block: DataBlock, + idx: usize, + end_idx: usize, + intersected_time_ranges: TimeRanges, + intersected_time_ranges_i: usize, +} + +impl Debug for DataBlockReader { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("DataBlockReader") + .field("data_block_range", &self.data_block.time_range()) + .field("idx", &self.idx) + .field("end_idx", &self.end_idx) + .field("intersected_time_ranges", &self.intersected_time_ranges) + .field("intersected_time_ranges_i", &self.intersected_time_ranges_i) + .finish() + } +} + +impl Default for DataBlockReader { + fn default() -> Self { + Self { + data_block: DataBlock::Bool { + ts: vec![], + val: vec![], + enc: Default::default(), + }, + idx: 1, + end_idx: 0, + intersected_time_ranges: TimeRanges::empty(), + intersected_time_ranges_i: 0, + } + } +} + +impl DataBlockReader { + pub fn new_uninit(value_type: ValueType) -> Self { + let data_block = DataBlock::new(0, value_type); + Self { + data_block, + idx: 1, + end_idx: 0, + intersected_time_ranges: TimeRanges::empty(), + intersected_time_ranges_i: 0, + } + } + + pub fn new(data_block: DataBlock, time_ranges: TimeRanges) -> Self { + let mut res = Self { + data_block, + idx: 1, + end_idx: 0, + intersected_time_ranges: time_ranges, + intersected_time_ranges_i: 0, + }; + if res.set_index_from_time_ranges() { + res + } else { + res.idx = usize::MAX; + res + } + } + + /// Iterates the ramaining TimeRange in `intersected_time_ranges`, if there are no remaning TimeRange's. + /// then return false. + /// + /// If there are overlaped time range of DataBlock and TimeRanges, set iteration range of `data_block` + /// and return true, otherwise set the iteration range a zero-length range `[1, 0]` and return false. + /// + fn set_index_from_time_ranges(&mut self) -> bool { + if self.intersected_time_ranges.is_empty() + || self.intersected_time_ranges_i >= self.intersected_time_ranges.len() + { + false + } else { + let tr_idx_start = self.intersected_time_ranges_i; + for tr in self.intersected_time_ranges.time_ranges()[tr_idx_start..].iter() { + self.intersected_time_ranges_i += 1; + // Check if the DataBlock matches one of the intersected time ranges. + // TODO: sometimes the comparison in loop can stop earily. + if let Some((min, max)) = self.data_block.index_range(tr) { + self.idx = min; + self.end_idx = max; + return true; + } + } + false + } + } + pub fn has_next(&mut self) -> bool { + if self.idx > self.end_idx { + self.set_index_from_time_ranges(); + } + self.idx < self.data_block.len() + } +} + +impl Iterator for DataBlockReader { + type Item = DataType; + fn next(&mut self) -> Option { + if self.idx > self.end_idx { + self.set_index_from_time_ranges(); + } + let res = self.data_block.get(self.idx); + self.idx += 1; + res + } +} + #[derive(Debug, Clone)] pub struct EncodedDataBlock { pub ts: Vec,