Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: http header with metrics #3536

Merged
merged 11 commits into from
Mar 18, 2024
3 changes: 2 additions & 1 deletion src/common/function/src/handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use async_trait::async_trait;
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, InsertRequest};
Expand All @@ -26,7 +27,7 @@ use table::requests::{CompactTableRequest, DeleteRequest, FlushTableRequest, Ins
#[async_trait]
pub trait TableMutationHandler: Send + Sync {
/// Inserts rows into the table.
async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
async fn insert(&self, request: InsertRequest, ctx: QueryContextRef) -> Result<Output>;

/// Delete rows from the table.
async fn delete(&self, request: DeleteRequest, ctx: QueryContextRef) -> Result<AffectedRows>;
Expand Down
5 changes: 3 additions & 2 deletions src/common/function/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ impl FunctionState {
use common_base::AffectedRows;
use common_meta::rpc::procedure::{MigrateRegionRequest, ProcedureStateResponse};
use common_query::error::Result;
use common_query::Output;
use session::context::QueryContextRef;
use store_api::storage::RegionId;
use table::requests::{
Expand Down Expand Up @@ -70,8 +71,8 @@ impl FunctionState {
&self,
_request: InsertRequest,
_ctx: QueryContextRef,
) -> Result<AffectedRows> {
Ok(ROWS)
) -> Result<Output> {
Ok(Output::new_with_affected_rows(ROWS))
}

async fn delete(
Expand Down
3 changes: 2 additions & 1 deletion src/common/plugins/src/consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@
pub const GREPTIME_EXEC_PREFIX: &str = "greptime_exec_";

/// Execution cost metrics key
pub const GREPTIME_EXEC_COST: &str = "greptime_exec_cost";
pub const GREPTIME_EXEC_READ_COST: &str = "greptime_exec_read_cost";
pub const GREPTIME_EXEC_WRITE_COST: &str = "greptime_exec_write_cost";
2 changes: 1 addition & 1 deletion src/common/plugins/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,4 @@
/// since `plugins` crate is at the top depending on crates like `frontend` and `datanode`
mod consts;

pub use consts::{GREPTIME_EXEC_COST, GREPTIME_EXEC_PREFIX};
pub use consts::{GREPTIME_EXEC_PREFIX, GREPTIME_EXEC_READ_COST, GREPTIME_EXEC_WRITE_COST};
16 changes: 13 additions & 3 deletions src/common/query/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ pub struct Output {
/// Original Output struct
/// carrying result data to response/client/user interface
pub enum OutputData {
AffectedRows(usize),
AffectedRows(OutputRows),
RecordBatches(RecordBatches),
Stream(SendableRecordBatchStream),
}
Expand All @@ -50,11 +50,11 @@ pub enum OutputData {
pub struct OutputMeta {
/// May exist for query output. One can retrieve execution metrics from this plan.
pub plan: Option<Arc<dyn PhysicalPlan>>,
pub cost: usize,
pub cost: OutputCost,
}

impl Output {
pub fn new_with_affected_rows(affected_rows: usize) -> Self {
pub fn new_with_affected_rows(affected_rows: OutputRows) -> Self {
Self {
data: OutputData::AffectedRows(affected_rows),
meta: Default::default(),
Expand All @@ -78,6 +78,13 @@ impl Output {
pub fn new(data: OutputData, meta: OutputMeta) -> Self {
Self { data, meta }
}

pub fn extract_rows_and_cost(&self) -> (OutputRows, OutputCost) {
match self.data {
OutputData::AffectedRows(rows) => (rows, self.meta.cost),
_ => (0, self.meta.cost),
}
}
}

impl Debug for OutputData {
Expand Down Expand Up @@ -133,3 +140,6 @@ impl From<&AddColumnLocation> for Location {
}
}
}

pub type OutputRows = usize;
pub type OutputCost = usize;
9 changes: 4 additions & 5 deletions src/frontend/src/instance/influxdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use servers::error::{AuthSnafu, Error};
use servers::influxdb::InfluxdbRequest;
Expand All @@ -30,7 +31,7 @@ impl InfluxdbLineProtocolHandler for Instance {
&self,
request: InfluxdbRequest,
ctx: QueryContextRef,
) -> servers::error::Result<()> {
) -> servers::error::Result<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
Expand All @@ -41,11 +42,9 @@ impl InfluxdbLineProtocolHandler for Instance {
interceptor_ref.pre_execute(&request.lines, ctx.clone())?;

let requests = request.try_into()?;
let _ = self
.handle_row_inserts(requests, ctx)
self.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(servers::error::ExecuteGrpcQuerySnafu)?;
Ok(())
.context(servers::error::ExecuteGrpcQuerySnafu)
}
}
43 changes: 13 additions & 30 deletions src/frontend/src/instance/otlp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,11 @@

use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use client::Output;
use common_error::ext::BoxedError;
use common_telemetry::tracing;
use opentelemetry_proto::tonic::collector::metrics::v1::{
ExportMetricsServiceRequest, ExportMetricsServiceResponse,
};
use opentelemetry_proto::tonic::collector::trace::v1::{
ExportTraceServiceRequest, ExportTraceServiceResponse,
};
use opentelemetry_proto::tonic::collector::metrics::v1::ExportMetricsServiceRequest;
use opentelemetry_proto::tonic::collector::trace::v1::ExportTraceServiceRequest;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::interceptor::{OpenTelemetryProtocolInterceptor, OpenTelemetryProtocolInterceptorRef};
use servers::otlp;
Expand All @@ -40,7 +37,7 @@ impl OpenTelemetryProtocolHandler for Instance {
&self,
request: ExportMetricsServiceRequest,
ctx: QueryContextRef,
) -> ServerResult<ExportMetricsServiceResponse> {
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
Expand All @@ -53,27 +50,20 @@ impl OpenTelemetryProtocolHandler for Instance {
interceptor_ref.pre_execute(ctx.clone())?;

let (requests, rows) = otlp::metrics::to_grpc_insert_requests(request)?;
let _ = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;

OTLP_METRICS_ROWS.inc_by(rows as u64);

let resp = ExportMetricsServiceResponse {
// TODO(sunng87): add support for partial_success in future patch
partial_success: None,
};
Ok(resp)
self.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}

#[tracing::instrument(skip_all)]
async fn traces(
&self,
request: ExportTraceServiceRequest,
ctx: QueryContextRef,
) -> ServerResult<ExportTraceServiceResponse> {
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
Expand All @@ -95,18 +85,11 @@ impl OpenTelemetryProtocolHandler for Instance {

let (requests, rows) = otlp::trace::to_grpc_insert_requests(table_name, spans)?;

let _ = self
.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;

OTLP_TRACES_ROWS.inc_by(rows as u64);

let resp = ExportTraceServiceResponse {
// TODO(fys): add support for partial_success in future patch
partial_success: None,
};
Ok(resp)
self.handle_row_inserts(requests, ctx)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)
}
}
61 changes: 35 additions & 26 deletions src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use api::prom_store::remote::read_request::ResponseType;
Expand All @@ -30,6 +31,7 @@ use operator::insert::InserterRef;
use operator::statement::StatementExecutor;
use prost::Message;
use servers::error::{self, AuthSnafu, Result as ServerResult};
use servers::http::header::{collect_plan_metrics, CONTENT_ENCODING_SNAPPY, CONTENT_TYPE_PROTOBUF};
use servers::http::prom_store::PHYSICAL_TABLE_PARAM;
use servers::interceptor::{PromStoreProtocolInterceptor, PromStoreProtocolInterceptorRef};
use servers::prom_store::{self, Metrics};
Expand Down Expand Up @@ -165,7 +167,7 @@ impl PromStoreProtocolHandler for Instance {
request: WriteRequest,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<()> {
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
Expand All @@ -177,58 +179,55 @@ impl PromStoreProtocolHandler for Instance {
interceptor_ref.pre_write(&request, ctx.clone())?;

let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?;
if with_metric_engine {
let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
let _ = self
.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string())
self.handle_metric_row_inserts(requests, ctx.clone(), physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
.context(error::ExecuteGrpcQuerySnafu)?
} else {
let _ = self
.handle_row_inserts(requests, ctx.clone())
self.handle_row_inserts(requests, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
}
.context(error::ExecuteGrpcQuerySnafu)?
};

PROM_STORE_REMOTE_WRITE_SAMPLES.inc_by(samples as u64);
Ok(())
Ok(output)
}

async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<()> {
) -> ServerResult<Output> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;

if with_metric_engine {
let output = if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
let _ = self
.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
self.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
.context(error::ExecuteGrpcQuerySnafu)?
} else {
let _ = self
.handle_row_inserts(request, ctx.clone())
self.handle_row_inserts(request, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
}
Ok(())
.context(error::ExecuteGrpcQuerySnafu)?
};

Ok(output)
}

async fn read(
Expand All @@ -254,18 +253,29 @@ impl PromStoreProtocolHandler for Instance {
match response_type {
ResponseType::Samples => {
let mut query_results = Vec::with_capacity(results.len());
let mut map = HashMap::new();
for (table_name, output) in results {
let plan = output.meta.plan.clone();
query_results.push(to_query_result(&table_name, output).await?);
if let Some(ref plan) = plan {
collect_plan_metrics(plan.clone(), &mut [&mut map]);
}
}

let response = ReadResponse {
results: query_results,
};

let resp_metrics = map
.into_iter()
.map(|(k, v)| (k, v.into()))
.collect::<HashMap<_, _>>();

// TODO(dennis): may consume too much memory, adds flow control
Ok(PromStoreResponse {
content_type: "application/x-protobuf".to_string(),
content_encoding: "snappy".to_string(),
content_type: CONTENT_TYPE_PROTOBUF.clone(),
content_encoding: CONTENT_ENCODING_SNAPPY.clone(),
resp_metrics,
body: prom_store::snappy_compress(&response.encode_to_vec())?,
})
}
Expand Down Expand Up @@ -309,7 +319,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
request: WriteRequest,
ctx: QueryContextRef,
_: bool,
) -> ServerResult<()> {
) -> ServerResult<Output> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?;
self.inserter
.handle_metric_row_inserts(
Expand All @@ -320,16 +330,15 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
)
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
Ok(())
.context(error::ExecuteGrpcQuerySnafu)
}

async fn write_fast(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
_with_metric_engine: bool,
) -> ServerResult<()> {
) -> ServerResult<Output> {
unimplemented!()
}

Expand Down