Skip to content

Commit

Permalink
refactor: remove field scan timer
Browse files Browse the repository at this point in the history
* remove points to record batch timer
  • Loading branch information
ZuoTiJia committed Sep 5, 2023
1 parent 75fc369 commit 41cd38e
Showing 1 changed file with 39 additions and 45 deletions.
84 changes: 39 additions & 45 deletions tskv/src/reader/iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,23 +256,17 @@ impl ArrayBuilderPtr {
/// Stores metrics about the table writer execution.
#[derive(Debug, Clone)]
pub struct SeriesGroupRowIteratorMetrics {
elapsed_point_to_record_batch: metrics::Time,
elapsed_field_scan: metrics::Time,
elapsed_series_scan: metrics::Time,
elapsed_build_resp_stream: metrics::Time,
elapsed_get_data_from_memcache: metrics::Time,
elapsed_get_field_location: metrics::Time,
elapsed_collect_row_time: metrics::Time,
elapsed_collect_aggregate_time: metrics::Time,
}

impl SeriesGroupRowIteratorMetrics {
/// Create new metrics
pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self {
let elapsed_point_to_record_batch =
MetricBuilder::new(metrics).subset_time("elapsed_point_to_record_batch", partition);

let elapsed_field_scan =
MetricBuilder::new(metrics).subset_time("elapsed_field_scan", partition);

let elapsed_series_scan =
MetricBuilder::new(metrics).subset_time("elapsed_series_scan", partition);

Expand All @@ -285,24 +279,22 @@ impl SeriesGroupRowIteratorMetrics {
let elapsed_get_field_location =
MetricBuilder::new(metrics).subset_time("elapsed_get_field_location", partition);

let elapsed_collect_row_time =
MetricBuilder::new(metrics).subset_time("elapsed_collect_row_time", partition);

let elapsed_collect_aggregate_time =
MetricBuilder::new(metrics).subset_time("elapsed_collect_aggregate_time", partition);

Self {
elapsed_point_to_record_batch,
elapsed_field_scan,
elapsed_series_scan,
elapsed_build_resp_stream,
elapsed_get_data_from_memcache,
elapsed_get_field_location,
elapsed_collect_row_time,
elapsed_collect_aggregate_time,
}
}

pub fn elapsed_point_to_record_batch(&self) -> &metrics::Time {
&self.elapsed_point_to_record_batch
}

pub fn elapsed_field_scan(&self) -> &metrics::Time {
&self.elapsed_field_scan
}

pub fn elapsed_series_scan(&self) -> &metrics::Time {
&self.elapsed_series_scan
}
Expand All @@ -318,6 +310,14 @@ impl SeriesGroupRowIteratorMetrics {
pub fn elapsed_get_field_location(&self) -> &metrics::Time {
&self.elapsed_get_field_location
}

pub fn elapsed_collect_row_time(&self) -> &metrics::Time {
&self.elapsed_collect_row_time
}

pub fn elapsed_collect_aggregate_time(&self) -> &metrics::Time {
&self.elapsed_collect_aggregate_time
}
}

// 1. Tsm文件遍历: KeyCursor
Expand Down Expand Up @@ -1017,13 +1017,19 @@ impl SeriesGroupRowIterator {
if self.is_finished {
return None;
}
// record elapsed_point_to_record_batch
let timer = self.metrics.elapsed_point_to_record_batch().timer();

let mut builders = match RowIterator::build_record_builders(self.query_option.as_ref()) {
Ok(builders) => builders,
Err(e) => return Some(Err(e)),
};
timer.done();

// record fetch_next_row time
let timer = if self.query_option.aggregates.is_some() {
self.metrics.elapsed_collect_aggregate_time().clone()
} else {
self.metrics.elapsed_collect_row_time().clone()
};
let timer_guard = timer.timer();

for _ in 0..self.batch_size {
match self.fetch_next_row(&mut builders).await {
Expand All @@ -1035,24 +1041,20 @@ impl SeriesGroupRowIterator {
Err(err) => return Some(Err(err)),
};
}
// record elapsed_point_to_record_batch
let timer = self.metrics.elapsed_point_to_record_batch().timer();
let result = {
let mut cols = Vec::with_capacity(builders.len());
for builder in builders.iter_mut() {
cols.push(builder.ptr.finish())
}

match RecordBatch::try_new(self.query_option.df_schema.clone(), cols) {
Ok(batch) => Some(Ok(batch)),
Err(err) => Some(Err(Error::CommonError {
reason: format!("iterator fail, {}", err),
})),
}
};
timer.done();
timer_guard.done();

result
let mut cols = Vec::with_capacity(builders.len());
for builder in builders.iter_mut() {
cols.push(builder.ptr.finish())
}

match RecordBatch::try_new(self.query_option.df_schema.clone(), cols) {
Ok(batch) => Some(Ok(batch)),
Err(err) => Some(Err(Error::CommonError {
reason: format!("iterator fail, {}", err),
})),
}
}
}

Expand Down Expand Up @@ -1245,8 +1247,6 @@ impl SeriesGroupRowIterator {

async fn collect_row_data(&mut self, builders: &mut [ArrayBuilderPtr]) -> Result<Option<()>> {
trace::trace!("======collect_row_data=========");
// Record elapsed_field_scan
let timer = self.metrics.elapsed_field_scan().timer();

let mut min_time = i64::MAX;
let mut row_cols = Vec::with_capacity(self.columns.len());
Expand Down Expand Up @@ -1283,7 +1283,6 @@ impl SeriesGroupRowIterator {
}

// Step field_scan completed.
timer.done();
trace::trace!(
"Collected data, series_id: {}, column count: {test_collected_col_num}, timestamp: {min_time}",
self.series_ids[self.i - 1],
Expand All @@ -1294,9 +1293,6 @@ impl SeriesGroupRowIterator {
return Ok(None);
}

// Record elapsed_point_to_record_batch
let timer = self.metrics.elapsed_point_to_record_batch().timer();

for (i, value) in row_cols.into_iter().enumerate() {
match self.columns[i].column_type() {
ColumnType::Time(unit) => {
Expand All @@ -1311,8 +1307,6 @@ impl SeriesGroupRowIterator {
}
}

timer.done();

Ok(Some(()))
}

Expand Down

0 comments on commit 41cd38e

Please sign in to comment.