Skip to content

Commit

Permalink
improve: Optimize read performance
Browse files Browse the repository at this point in the history
* Utilizes L1 and L4 data characteristics to optimize read performance
  • Loading branch information
ZuoTiJia committed Sep 5, 2023
1 parent d325e33 commit f831492
Showing 1 changed file with 132 additions and 51 deletions.
183 changes: 132 additions & 51 deletions tskv/src/reader/iterator.rs
@@ -1,6 +1,5 @@
use std::cmp::{Ordering, Reverse};
use std::collections::BinaryHeap;
use std::mem::MaybeUninit;
use std::collections::{BinaryHeap, HashSet};
use std::sync::Arc;

use datafusion::arrow::array::{
Expand Down Expand Up @@ -439,16 +438,18 @@ impl Ord for DataTypeWithFileId {
}
}

pub struct LevelTSDataStream {
field_file_locations: Vec<Box<dyn Iterator<Item = FieldFileLocation> + Send + 'static>>,
pub struct Level0TSDataStream {
field_file_locations: Vec<std::vec::IntoIter<FieldFileLocation>>,
peeked_file_locations: Vec<Option<FieldFileLocation>>,
//
data_heap: BinaryHeap<Reverse<DataTypeWithFileId>>,
cached_data_type: Option<DataTypeWithFileId>,
}

unsafe impl Send for LevelTSDataStream {}
unsafe impl Sync for LevelTSDataStream {}
pub struct Level14TSDataStream {
field_file_location: std::vec::IntoIter<FieldFileLocation>,
peeked_file_locations: Option<FieldFileLocation>,
}

async fn open_field_file_location(
column_file: Arc<ColumnFile>,
Expand All @@ -472,17 +473,65 @@ async fn open_field_file_location(
Ok(res)
}

impl LevelTSDataStream {
impl Level14TSDataStream {
pub async fn new(
version: Arc<Version>,
time_ranges: Arc<TimeRanges>,
column_files: Vec<Arc<ColumnFile>>,
field_id: FieldId,
value_type: ValueType,
) -> Result<Self> {
let file_location_futures = column_files.into_iter().map(move |f| {
open_field_file_location(
f,
version.clone(),
time_ranges.clone(),
field_id,
value_type,
)
});
let file_locations = join_all(file_location_futures)
.await
.into_iter()
.collect::<Result<Vec<Vec<FieldFileLocation>>>>()?
.into_iter()
.flatten()
.collect::<Vec<FieldFileLocation>>()
.into_iter();
Ok(Self {
field_file_location: file_locations,
peeked_file_locations: None,
})
}

async fn next_data(&mut self) -> Result<Option<DataType>> {
loop {
match &mut self.peeked_file_locations {
None => match self.field_file_location.next() {
None => return Ok(None),
Some(location) => {
self.peeked_file_locations.replace(location);
}
},
Some(location) => match location.next_data().await? {
None => {
self.peeked_file_locations.take();
}
other => return Ok(other),
},
}
}
}
}

impl Level0TSDataStream {
pub async fn new(
version: Arc<Version>,
time_ranges: Arc<TimeRanges>,
column_files: Vec<Arc<ColumnFile>>,
field_id: FieldId,
value_type: ValueType,
) -> Result<Self> {
let mut field_file_locations: Vec<
Box<dyn Iterator<Item = FieldFileLocation> + 'static + Send>,
> = vec![];
let locations_future = column_files.into_iter().map(|f| {
open_field_file_location(
f,
Expand All @@ -492,14 +541,13 @@ impl LevelTSDataStream {
value_type,
)
});
for ls in join_all(locations_future)
let field_file_locations = join_all(locations_future)
.await
.into_iter()
.collect::<Result<Vec<_>>>()?
.into_iter()
{
field_file_locations.push(Box::new(ls.into_iter()))
}
.map(|ls| ls.into_iter())
.collect::<Vec<_>>();
let peeked_file_locations = field_file_locations.iter().map(|_| None).collect();

Ok(Self {
Expand Down Expand Up @@ -705,7 +753,9 @@ pub struct FieldCursor {

cache_data: Box<dyn Iterator<Item = DataType> + 'static>,
peeked_cache: Option<DataType>,
level_data_stream: [Option<LevelTSDataStream>; 5],

level0_data_stream: Option<Level0TSDataStream>,
level14_data_stream: Option<Level14TSDataStream>,

peeked_l0: Option<DataType>,
peeked_l14: Option<DataType>,
Expand All @@ -716,44 +766,43 @@ unsafe impl Send for FieldCursor {}

impl FieldCursor {
pub fn empty(value_type: ValueType, name: Arc<String>) -> Self {
let mut level_data_stream_uninit = MaybeUninit::uninit_array();
level_data_stream_uninit.iter_mut().for_each(|u| {
u.write(None);
});
Self {
name,
value_type,
cache_data: Box::new(std::iter::empty()),
peeked_cache: None,
level14_data_stream: None,
level0_data_stream: None,
peeked_l0: None,
peeked_l14: None,
level_data_stream: unsafe { MaybeUninit::array_assume_init(level_data_stream_uninit) },
}
}

pub fn new(
name: Arc<String>,
value_type: ValueType,
cache_data: Box<dyn Iterator<Item = DataType> + 'static>,
level_data_stream: [Option<LevelTSDataStream>; 5],
level0_data_stream: Option<Level0TSDataStream>,
level14_data_stream: Option<Level14TSDataStream>,
) -> Self {
Self {
name,
value_type,
cache_data,
peeked_cache: None,
level0_data_stream,
level14_data_stream,
peeked_l0: None,
peeked_l14: None,
level_data_stream,
}
}

async fn next_l0_data(&mut self) -> Result<Option<DataType>> {
match self.level_data_stream[0].as_mut() {
match &mut self.level0_data_stream {
None => Ok(None),
Some(stream) => match stream.next_data().await? {
None => {
self.level_data_stream[0].as_mut().take();
self.level0_data_stream.take();
Ok(None)
}
other => Ok(other),
Expand All @@ -762,21 +811,16 @@ impl FieldCursor {
}

async fn next_l14_data(&mut self) -> Result<Option<DataType>> {
for stream_opt in self.level_data_stream.iter_mut().skip(1).rev() {
match stream_opt {
match &mut self.level14_data_stream {
None => Ok(None),
Some(stream) => match stream.next_data().await? {
None => {
continue;
self.level14_data_stream.take();
Ok(None)
}
Some(stream) => match stream.next_data().await? {
None => {
*stream_opt = None;
continue;
}
other => return Ok(other),
},
}
other => Ok(other),
},
}
Ok(None)
}
}

Expand Down Expand Up @@ -1331,25 +1375,61 @@ impl SeriesGroupRowIterator {
time_ranges: Arc<TimeRanges>,
field_id: FieldId,
value_type: ValueType,
) -> Result<[Option<LevelTSDataStream>; 5]> {
let mut res_uninit = MaybeUninit::uninit_array();
let level_files = version.get_level_files(&time_ranges, field_id);
for (res, level_file) in res_uninit.iter_mut().zip(level_files) {
if let Some(files) = level_file {
let stream = LevelTSDataStream::new(
) -> Result<(Option<Level0TSDataStream>, Option<Level14TSDataStream>)> {
let mut level_files = version.get_level_files(&time_ranges, field_id);

let l0 = match level_files[0].take() {
Some(fs) => Some(
Level0TSDataStream::new(
version.clone(),
time_ranges.clone(),
files,
fs,
field_id,
value_type,
)
.await?;
res.write(Some(stream));
} else {
res.write(None);
}
.await?,
),
None => None,
};
let fs: Vec<Arc<ColumnFile>> = level_files
.into_iter()
.skip(1)
.rev()
.flatten()
.flatten()
.collect::<Vec<_>>();

// assert column file of level 1-4 is not overlap and is sorted
if cfg!(debug_assertions) {
debug!("debug assertion level file l1 l4");
let time_range = fs.iter().map(|a| *a.time_range()).collect::<Vec<_>>();

let mut time_range_cp = time_range
.clone()
.into_iter()
.collect::<HashSet<_>>()
.into_iter()
.collect::<Vec<_>>();
time_range_cp.sort();
debug_assert!(time_range.len().eq(&time_range_cp.len()));
debug_assert!(time_range_cp.eq(&time_range));
}
Ok(unsafe { MaybeUninit::array_assume_init(res_uninit) })

let l14 = if fs.is_empty() {
None
} else {
Some(
Level14TSDataStream::new(
version.clone(),
time_ranges.clone(),
fs,
field_id,
value_type,
)
.await?,
)
};
Ok((l0, l14))
}

/// Build a FieldCursor with cached data and file locations.
Expand Down Expand Up @@ -1387,7 +1467,7 @@ impl SeriesGroupRowIterator {
);
let cache_data_iter = cache_data.into_iter().rev();

let level_ts_stream = self
let (l0_stream, l14_stream) = self
.build_level_ts_stream(
super_version.version.clone(),
time_ranges_ref.clone(),
Expand All @@ -1399,7 +1479,8 @@ impl SeriesGroupRowIterator {
field_name.clone(),
field_type,
Box::new(cache_data_iter),
level_ts_stream,
l0_stream,
l14_stream,
);

Ok(cursor)
Expand Down

0 comments on commit f831492

Please sign in to comment.