Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor iterator and optimize performance #1467

Merged
merged 5 commits into from Sep 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Expand Up @@ -198,7 +198,7 @@ overflow-checks = true
panic = 'unwind'

[profile.test-ci]
debug-assertions = false
debug-assertions = true
incremental = true
inherits = "test"
overflow-checks = false
Expand Down
14 changes: 7 additions & 7 deletions common/models/src/predicate/domain.rs
Expand Up @@ -242,12 +242,8 @@ impl TimeRanges {
}

pub fn overlaps(&self, time_range: &TimeRange) -> bool {
for tr in self.inner.iter() {
if tr.overlaps(time_range) {
return true;
}
}
false
self.max_time_range().overlaps(time_range)
roseboy-liu marked this conversation as resolved.
Show resolved Hide resolved
&& self.inner.iter().any(|tr| tr.overlaps(time_range))
}

pub fn includes(&self, time_range: &TimeRange) -> bool {
Expand Down Expand Up @@ -324,6 +320,10 @@ impl TimeRanges {

None
}

pub fn max_time_range(&self) -> TimeRange {
TimeRange::new(self.min_ts, self.max_ts)
}
}

impl AsRef<[TimeRange]> for TimeRanges {
Expand Down Expand Up @@ -1465,7 +1465,7 @@ impl QueryArgs {
pub struct QueryExpr {
pub split: PlacedSplit,
pub df_schema: Schema,
pub table_schema: TskvTableSchema,
pub table_schema: TskvTableSchemaRef,
}

impl QueryExpr {
Expand Down
2 changes: 1 addition & 1 deletion common/models/src/schema.rs
Expand Up @@ -58,7 +58,7 @@ pub const DEFAULT_PRECISION: &str = "NS";

#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
pub enum TableSchema {
TsKvTableSchema(Arc<TskvTableSchema>),
TsKvTableSchema(TskvTableSchemaRef),
ExternalTableSchema(Arc<ExternalTableSchema>),
StreamTableSchema(Arc<StreamTable>),
}
Expand Down
2 changes: 0 additions & 2 deletions config/src/lib.rs
@@ -1,5 +1,3 @@
#![feature(is_some_and)]

use std::fs::File;
use std::io;
use std::io::Read;
Expand Down
8 changes: 5 additions & 3 deletions meta/src/model/meta_tenant.rs
Expand Up @@ -11,7 +11,9 @@ use models::auth::role::{CustomTenantRole, SystemTenantRole, TenantRoleIdentifie
use models::auth::user::UserDesc;
use models::meta_data::*;
use models::oid::{Identifier, Oid};
use models::schema::{DatabaseSchema, ExternalTableSchema, TableSchema, Tenant, TskvTableSchema};
use models::schema::{
DatabaseSchema, ExternalTableSchema, TableSchema, Tenant, TskvTableSchemaRef,
};
use parking_lot::RwLock;
use store::command;
use trace::info;
Expand Down Expand Up @@ -461,7 +463,7 @@ impl TenantMeta {
&self,
db: &str,
table: &str,
) -> MetaResult<Option<Arc<TskvTableSchema>>> {
) -> MetaResult<Option<TskvTableSchemaRef>> {
if let Some(TableSchema::TsKvTableSchema(val)) = self.data.read().table_schema(db, table) {
return Ok(Some(val));
}
Expand All @@ -472,7 +474,7 @@ impl TenantMeta {
&self,
db: &str,
table: &str,
) -> MetaResult<Option<Arc<TskvTableSchema>>> {
) -> MetaResult<Option<TskvTableSchemaRef>> {
let req = ReadCommand::TableSchema(
self.cluster.clone(),
self.tenant.name().to_string(),
Expand Down
Expand Up @@ -110,7 +110,7 @@ impl ExecutionPlan for AggregateFilterTskvExec {
split,
Some(agg_columns),
self.schema.clone(),
(*self.table_schema).clone(),
self.table_schema.clone(),
);

let span_ctx = context.session_config().get_extension::<SpanContext>();
Expand Down
Expand Up @@ -260,7 +260,7 @@ impl TagScanStream {
split,
None,
proj_schema.clone(),
proj_table_schema,
proj_table_schema.into(),
);

let span_ctx = span_recorder.span_ctx();
Expand Down
Expand Up @@ -262,7 +262,7 @@ impl TableScanStream {
split,
None,
proj_schema.clone(),
proj_table_schema,
proj_table_schema.into(),
);

let span_ctx = span_recorder.span_ctx();
Expand Down
Expand Up @@ -14,7 +14,9 @@ use datafusion::physical_plan::ExecutionPlan;
use meta::model::MetaClientRef;
use models::auth::user::User;
use models::oid::Identifier;
use models::schema::{ColumnType, ExternalTableSchema, StreamTable, TableSchema, TskvTableSchema};
use models::schema::{
ColumnType, ExternalTableSchema, StreamTable, TableSchema, TskvTableSchemaRef,
};
use models::ValueType;

use crate::dispatcher::query_tracker::QueryTracker;
Expand Down Expand Up @@ -128,7 +130,7 @@ impl TableProvider for InformationColumnsTable {
fn append_tskv_table(
tenant_name: &str,
database_name: &str,
table: Arc<TskvTableSchema>,
table: TskvTableSchemaRef,
builder: &mut InformationSchemaColumnsBuilder,
) {
for (idx, col) in table.columns().iter().enumerate() {
Expand Down
5 changes: 2 additions & 3 deletions query_server/query/src/prom/remote_server.rs
@@ -1,13 +1,12 @@
use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use bytes::Bytes;
use coordinator::service::CoordinatorRef;
use datafusion::arrow::datatypes::ToByteSlice;
use meta::error::MetaError;
use meta::model::MetaClientRef;
use models::schema::{TskvTableSchema, TIME_FIELD_NAME};
use models::schema::{TskvTableSchemaRef, TIME_FIELD_NAME};
use models::snappy::SnappyCodec;
use protocol_parser::Line;
use protos::models_helper::{parse_proto_bytes, to_proto_bytes};
Expand Down Expand Up @@ -337,7 +336,7 @@ async fn transform_time_series(
#[derive(Debug)]
struct SqlWithTable {
pub sql: String,
pub table: Arc<TskvTableSchema>,
pub table: TskvTableSchemaRef,
}

#[cfg(test)]
Expand Down
1 change: 0 additions & 1 deletion query_server/test/cases/dql/only_tag_col.result
Expand Up @@ -103,4 +103,3 @@ time,t0,t1,f0
1970-01-01T00:00:00.000000302,tag12,tag27,222
1970-01-01T00:00:00.000000303,tag13,tag28,333
1970-01-01T00:00:00.000000304,tag14,tag29,444

3 changes: 2 additions & 1 deletion query_server/test/cases/dql/only_tag_col.sql
Expand Up @@ -7,7 +7,8 @@ create database only_tag_col WITH TTL '100000d';
drop table if exists m2;
CREATE TABLE IF NOT EXISTS m2(f0 BIGINT , f1 DOUBLE , TAGS(t0, t1, t2) );

INSERT m2(TIME, f0, f1, t0, t1) VALUES(101, 111, 444, 'tag11', 'tag21'),
INSERT m2(TIME, f0, f1, t0, t1) VALUES
(101, 111, 444, 'tag11', 'tag21'),
(102, 222, 333, 'tag12', 'tag22'),
(103, 333, 222, 'tag13', 'tag23'),
(104, 444, 111, 'tag14', 'tag24'),
Expand Down
2 changes: 1 addition & 1 deletion tskv/src/compaction/check.rs
Expand Up @@ -228,7 +228,7 @@ pub(crate) async fn vnode_hash_tree(
(vnode_rlock.version(), vnode_rlock.tf_id())
};
let mut readers: Vec<Arc<TsmReader>> = Vec::new();
let tsm_paths: Vec<PathBuf> = version
let tsm_paths: Vec<&PathBuf> = version
.levels_info()
.iter()
.flat_map(|l| l.files.iter().map(|f| f.file_path()))
Expand Down
4 changes: 2 additions & 2 deletions tskv/src/database.rs
Expand Up @@ -8,7 +8,7 @@ use memory_pool::MemoryPoolRef;
use meta::model::MetaRef;
use metrics::metric_register::MetricsRegister;
use models::predicate::domain::TimeRange;
use models::schema::{DatabaseSchema, Precision, TskvTableSchema};
use models::schema::{DatabaseSchema, Precision, TskvTableSchema, TskvTableSchemaRef};
use models::{SchemaId, SeriesId, SeriesKey};
use protos::models::{Column, ColumnType, FieldType, Table};
use snafu::ResultExt;
Expand Down Expand Up @@ -463,7 +463,7 @@ impl Database {
Ok(None)
}

pub fn get_table_schema(&self, table_name: &str) -> Result<Option<Arc<TskvTableSchema>>> {
pub fn get_table_schema(&self, table_name: &str) -> Result<Option<TskvTableSchemaRef>> {
Ok(self.schemas.get_table_schema(table_name)?)
}

Expand Down
2 changes: 2 additions & 0 deletions tskv/src/lib.rs
@@ -1,5 +1,7 @@
#![allow(dead_code)]
#![allow(unreachable_patterns)]
#![feature(maybe_uninit_array_assume_init)]
#![feature(maybe_uninit_uninit_array)]

use std::fmt::Debug;
use std::sync::Arc;
Expand Down
29 changes: 26 additions & 3 deletions tskv/src/memcache.rs
Expand Up @@ -8,7 +8,9 @@ use flatbuffers::{ForwardsUOffset, Vector};
use memory_pool::{MemoryConsumer, MemoryPoolRef, MemoryReservation};
use minivec::{mini_vec, MiniVec};
use models::predicate::domain::TimeRange;
use models::schema::{timestamp_convert, Precision, TableColumn, TskvTableSchema};
use models::schema::{
timestamp_convert, Precision, TableColumn, TskvTableSchema, TskvTableSchemaRef,
};
use models::utils::split_id;
use models::{
ColumnId, FieldId, PhysicalDType as ValueType, RwLockRef, SchemaId, SeriesId, Timestamp,
Expand Down Expand Up @@ -412,7 +414,7 @@ impl SeriesData {
}
}

pub fn flat_groups(&self) -> Vec<(SchemaId, Arc<TskvTableSchema>, &LinkedList<RowData>)> {
pub fn flat_groups(&self) -> Vec<(SchemaId, TskvTableSchemaRef, &LinkedList<RowData>)> {
self.groups
.iter()
.map(|g| (g.schema.schema_id, g.schema.clone(), &g.rows))
Expand Down Expand Up @@ -619,7 +621,7 @@ impl MemCache {
}
}

#[derive(Debug, Clone, PartialEq)]
#[derive(Debug, Clone)]
pub enum DataType {
U64(i64, u64),
I64(i64, i64),
Expand All @@ -628,6 +630,27 @@ pub enum DataType {
Bool(i64, bool),
}

impl PartialEq for DataType {
fn eq(&self, other: &Self) -> bool {
self.timestamp().eq(&other.timestamp())
}
}

impl Eq for DataType {}

impl PartialOrd for DataType {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}

/// Only care about timestamps when comparing
impl Ord for DataType {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.timestamp().cmp(&other.timestamp())
}
}

impl DataType {
pub fn new(vtype: ValueType, ts: i64) -> Self {
match vtype {
Expand Down