diff --git a/tskv/src/reader/iterator.rs b/tskv/src/reader/iterator.rs index a4bf273c5..d29b8536b 100644 --- a/tskv/src/reader/iterator.rs +++ b/tskv/src/reader/iterator.rs @@ -256,23 +256,17 @@ impl ArrayBuilderPtr { /// Stores metrics about the table writer execution. #[derive(Debug, Clone)] pub struct SeriesGroupRowIteratorMetrics { - elapsed_point_to_record_batch: metrics::Time, - elapsed_field_scan: metrics::Time, elapsed_series_scan: metrics::Time, elapsed_build_resp_stream: metrics::Time, elapsed_get_data_from_memcache: metrics::Time, elapsed_get_field_location: metrics::Time, + elapsed_collect_row_time: metrics::Time, + elapsed_collect_aggregate_time: metrics::Time, } impl SeriesGroupRowIteratorMetrics { /// Create new metrics pub fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { - let elapsed_point_to_record_batch = - MetricBuilder::new(metrics).subset_time("elapsed_point_to_record_batch", partition); - - let elapsed_field_scan = - MetricBuilder::new(metrics).subset_time("elapsed_field_scan", partition); - let elapsed_series_scan = MetricBuilder::new(metrics).subset_time("elapsed_series_scan", partition); @@ -285,24 +279,22 @@ impl SeriesGroupRowIteratorMetrics { let elapsed_get_field_location = MetricBuilder::new(metrics).subset_time("elapsed_get_field_location", partition); + let elapsed_collect_row_time = + MetricBuilder::new(metrics).subset_time("elapsed_collect_row_time", partition); + + let elapsed_collect_aggregate_time = + MetricBuilder::new(metrics).subset_time("elapsed_collect_aggregate_time", partition); + Self { - elapsed_point_to_record_batch, - elapsed_field_scan, elapsed_series_scan, elapsed_build_resp_stream, elapsed_get_data_from_memcache, elapsed_get_field_location, + elapsed_collect_row_time, + elapsed_collect_aggregate_time, } } - pub fn elapsed_point_to_record_batch(&self) -> &metrics::Time { - &self.elapsed_point_to_record_batch - } - - pub fn elapsed_field_scan(&self) -> &metrics::Time { - &self.elapsed_field_scan - } - pub fn elapsed_series_scan(&self) -> &metrics::Time { &self.elapsed_series_scan } @@ -318,6 +310,14 @@ impl SeriesGroupRowIteratorMetrics { pub fn elapsed_get_field_location(&self) -> &metrics::Time { &self.elapsed_get_field_location } + + pub fn elapsed_collect_row_time(&self) -> &metrics::Time { + &self.elapsed_collect_row_time + } + + pub fn elapsed_collect_aggregate_time(&self) -> &metrics::Time { + &self.elapsed_collect_aggregate_time + } } // 1. Tsm文件遍历: KeyCursor @@ -1017,13 +1017,19 @@ impl SeriesGroupRowIterator { if self.is_finished { return None; } - // record elapsed_point_to_record_batch - let timer = self.metrics.elapsed_point_to_record_batch().timer(); + let mut builders = match RowIterator::build_record_builders(self.query_option.as_ref()) { Ok(builders) => builders, Err(e) => return Some(Err(e)), }; - timer.done(); + + // record fetch_next_row time + let timer = if self.query_option.aggregates.is_some() { + self.metrics.elapsed_collect_aggregate_time().clone() + } else { + self.metrics.elapsed_collect_row_time().clone() + }; + let timer_guard = timer.timer(); for _ in 0..self.batch_size { match self.fetch_next_row(&mut builders).await { @@ -1035,24 +1041,20 @@ impl SeriesGroupRowIterator { Err(err) => return Some(Err(err)), }; } - // record elapsed_point_to_record_batch - let timer = self.metrics.elapsed_point_to_record_batch().timer(); - let result = { - let mut cols = Vec::with_capacity(builders.len()); - for builder in builders.iter_mut() { - cols.push(builder.ptr.finish()) - } - match RecordBatch::try_new(self.query_option.df_schema.clone(), cols) { - Ok(batch) => Some(Ok(batch)), - Err(err) => Some(Err(Error::CommonError { - reason: format!("iterator fail, {}", err), - })), - } - }; - timer.done(); + timer_guard.done(); - result + let mut cols = Vec::with_capacity(builders.len()); + for builder in builders.iter_mut() { + cols.push(builder.ptr.finish()) + } + + match RecordBatch::try_new(self.query_option.df_schema.clone(), cols) { + Ok(batch) => Some(Ok(batch)), + Err(err) => Some(Err(Error::CommonError { + reason: format!("iterator fail, {}", err), + })), + } } } @@ -1245,8 +1247,6 @@ impl SeriesGroupRowIterator { async fn collect_row_data(&mut self, builders: &mut [ArrayBuilderPtr]) -> Result> { trace::trace!("======collect_row_data========="); - // Record elapsed_field_scan - let timer = self.metrics.elapsed_field_scan().timer(); let mut min_time = i64::MAX; let mut row_cols = Vec::with_capacity(self.columns.len()); @@ -1283,7 +1283,6 @@ impl SeriesGroupRowIterator { } // Step field_scan completed. - timer.done(); trace::trace!( "Collected data, series_id: {}, column count: {test_collected_col_num}, timestamp: {min_time}", self.series_ids[self.i - 1], @@ -1294,9 +1293,6 @@ impl SeriesGroupRowIterator { return Ok(None); } - // Record elapsed_point_to_record_batch - let timer = self.metrics.elapsed_point_to_record_batch().timer(); - for (i, value) in row_cols.into_iter().enumerate() { match self.columns[i].column_type() { ColumnType::Time(unit) => { @@ -1311,8 +1307,6 @@ impl SeriesGroupRowIterator { } } - timer.done(); - Ok(Some(())) }