Skip to content

Commit

Permalink
feat(cubestore): Metrics - track command for data/cache/queue (#7430)
Browse files Browse the repository at this point in the history
  • Loading branch information
ovr committed Nov 20, 2023
1 parent f7859ca commit 91db103
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 5 deletions.
17 changes: 15 additions & 2 deletions rust/cubestore/cubestore/src/sql/cachestore.rs
Expand Up @@ -9,6 +9,7 @@ use crate::sql::parser::{
use crate::sql::{QueryPlans, SqlQueryContext, SqlService};
use crate::store::DataFrame;
use crate::table::{Row, TableValue};
use crate::util::metrics;
use crate::{app_metrics, CubeError};
use async_trait::async_trait;
use datafusion::sql::parser::Statement as DFStatement;
Expand Down Expand Up @@ -174,7 +175,13 @@ impl CacheStoreSqlService {
_context: SqlQueryContext,
command: CacheCommand,
) -> Result<Arc<DataFrame>, CubeError> {
app_metrics::CACHE_QUERIES.increment();
app_metrics::CACHE_QUERIES.add_with_tags(
1,
Some(&vec![metrics::format_tag(
"command",
command.as_tag_command(),
)]),
);
let execution_time = SystemTime::now();

let (result, track_time) = match command {
Expand Down Expand Up @@ -268,7 +275,13 @@ impl CacheStoreSqlService {
_context: SqlQueryContext,
command: QueueCommand,
) -> Result<Arc<DataFrame>, CubeError> {
app_metrics::QUEUE_QUERIES.increment();
app_metrics::QUEUE_QUERIES.add_with_tags(
1,
Some(&vec![metrics::format_tag(
"command",
command.as_tag_command(),
)]),
);
let execution_time = SystemTime::now();

let (result, track_time) = match command {
Expand Down
50 changes: 47 additions & 3 deletions rust/cubestore/cubestore/src/sql/mod.rs
Expand Up @@ -81,6 +81,7 @@ pub mod cachestore;
pub mod parser;

use crate::sql::cachestore::CacheStoreSqlService;
use crate::util::metrics;
use mockall::automock;

#[automock]
Expand Down Expand Up @@ -944,6 +945,11 @@ impl SqlService for SqlServiceImpl {
schema_name,
if_not_exists,
} => {
app_metrics::DATA_QUERIES.add_with_tags(
1,
Some(&vec![metrics::format_tag("command", "create_schema")]),
);

let name = schema_name.to_string();
let res = self.create_schema(name, if_not_exists).await?;
Ok(Arc::new(DataFrame::from(vec![res])))
Expand All @@ -963,6 +969,11 @@ impl SqlService for SqlServiceImpl {
unique_key,
partitioned_index,
} => {
app_metrics::DATA_QUERIES.add_with_tags(
1,
Some(&vec![metrics::format_tag("command", "create_table")]),
);

let nv = &name.0;
if nv.len() != 2 {
return Err(CubeError::user(format!(
Expand Down Expand Up @@ -1127,6 +1138,11 @@ impl SqlService for SqlServiceImpl {
columns,
..
}) => {
app_metrics::DATA_QUERIES.add_with_tags(
1,
Some(&vec![metrics::format_tag("command", "create_index")]),
);

if table_name.0.len() != 2 {
return Err(CubeError::user(format!(
"Schema's name should be present in table name but found: {}",
Expand Down Expand Up @@ -1163,6 +1179,11 @@ impl SqlService for SqlServiceImpl {
credentials,
or_update,
} => {
app_metrics::DATA_QUERIES.add_with_tags(
1,
Some(&vec![metrics::format_tag("command", "create_source")]),
);

if or_update {
let creds = match source_type.as_str() {
"ksql" => {
Expand Down Expand Up @@ -1209,6 +1230,14 @@ impl SqlService for SqlServiceImpl {
columns,
if_not_exists,
}) => {
app_metrics::DATA_QUERIES.add_with_tags(
1,
Some(&vec![metrics::format_tag(
"command",
"create_partitioned_index",
)]),
);

if name.0.len() != 2 {
return Err(CubeError::user(format!(
"Expected name for PARTITIONED INDEX in the form '<SCHEMA>.<INDEX>', found: {}",
Expand All @@ -1230,24 +1259,31 @@ impl SqlService for SqlServiceImpl {
CubeStoreStatement::Statement(Statement::Drop {
object_type, names, ..
}) => {
match object_type {
let command = match object_type {
ObjectType::Schema => {
self.db.delete_schema(names[0].to_string()).await?;
&"drop_schema"
}
ObjectType::Table => {
let table = self
.db
.get_table(names[0].0[0].to_string(), names[0].0[1].to_string())
.await?;
self.db.drop_table(table.get_id()).await?;
&"drop_table"
}
ObjectType::PartitionedIndex => {
let schema = names[0].0[0].value.clone();
let name = names[0].0[1].value.clone();
self.db.drop_partitioned_index(schema, name).await?;
&"drop_partitioned_index"
}
_ => return Err(CubeError::user("Unsupported drop operation".to_string())),
}
};

app_metrics::DATA_QUERIES
.add_with_tags(1, Some(&vec![metrics::format_tag("command", command)]));

Ok(Arc::new(DataFrame::new(vec![], vec![])))
}
CubeStoreStatement::Statement(Statement::Insert {
Expand All @@ -1256,6 +1292,9 @@ impl SqlService for SqlServiceImpl {
source,
..
}) => {
app_metrics::DATA_QUERIES
.add_with_tags(1, Some(&vec![metrics::format_tag("command", "insert")]));

let data = if let SetExpr::Values(Values(data_series)) = &source.body {
data_series
} else {
Expand Down Expand Up @@ -1295,14 +1334,19 @@ impl SqlService for SqlServiceImpl {
context.trace_obj.clone(),
)
.await?;

// TODO distribute and combine
let res = match logical_plan {
QueryPlan::Meta(logical_plan) => {
app_metrics::META_QUERIES.increment();
Arc::new(self.query_planner.execute_meta_plan(logical_plan).await?)
}
QueryPlan::Select(serialized, workers) => {
app_metrics::DATA_QUERIES.increment();
app_metrics::DATA_QUERIES.add_with_tags(
1,
Some(&vec![metrics::format_tag("command", "select")]),
);

let cluster = self.cluster.clone();
let executor = self.query_executor.clone();
timeout(
Expand Down
36 changes: 36 additions & 0 deletions rust/cubestore/cubestore/src/sql/parser.rs
Expand Up @@ -91,6 +91,19 @@ pub enum CacheCommand {
},
}

impl CacheCommand {
pub fn as_tag_command(&self) -> &'static str {
match self {
CacheCommand::Set { .. } => "set",
CacheCommand::Get { .. } => "get",
CacheCommand::Keys { .. } => "keys",
CacheCommand::Remove { .. } => "remove",
CacheCommand::Truncate { .. } => "truncate",
CacheCommand::Incr { .. } => "incr",
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum QueueCommand {
Add {
Expand Down Expand Up @@ -142,6 +155,29 @@ pub enum QueueCommand {
Truncate {},
}

impl QueueCommand {
pub fn as_tag_command(&self) -> &'static str {
match self {
QueueCommand::Add { .. } => "add",
QueueCommand::Get { .. } => "get",
QueueCommand::ToCancel { .. } => "to_cancel",
QueueCommand::List { status_filter, .. } => match status_filter {
Some(QueueItemStatus::Active) => "active",
Some(QueueItemStatus::Pending) => "pending",
_ => "list",
},
QueueCommand::Cancel { .. } => "cancel",
QueueCommand::Heartbeat { .. } => "heartbeat",
QueueCommand::Ack { .. } => "ack",
QueueCommand::MergeExtra { .. } => "merge_extra",
QueueCommand::Retrieve { .. } => "retrieve",
QueueCommand::Result { .. } => "result",
QueueCommand::ResultBlocking { .. } => "result_blocking",
QueueCommand::Truncate { .. } => "truncate",
}
}
}

#[derive(Debug, Clone, PartialEq)]
pub enum SystemCommand {
KillAllJobs,
Expand Down
4 changes: 4 additions & 0 deletions rust/cubestore/cubestore/src/util/metrics.rs
Expand Up @@ -61,6 +61,10 @@ pub const fn distribution(name: &'static str) -> IntMetric {
}
}

pub fn format_tag(name: &'static str, value: &str) -> String {
format!("{}:{}", name, value)
}

pub struct Counter {
metric: Metric,
}
Expand Down

0 comments on commit 91db103

Please sign in to comment.