Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: decode prom requests to grpc #3425

Merged
merged 19 commits into from
Mar 5, 2024
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 44 additions & 2 deletions src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_catalog::format_full_table_name;
Expand Down Expand Up @@ -167,7 +168,7 @@ impl PromStoreProtocolHandler for Instance {
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;

let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?;
let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?;
if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
Expand All @@ -190,6 +191,38 @@ impl PromStoreProtocolHandler for Instance {
Ok(())
}

async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<()> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;

if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
let _ = self
.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
} else {
let _ = self
.handle_row_inserts(request, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
}
Ok(())
}

async fn read(
&self,
request: ReadRequest,
Expand Down Expand Up @@ -265,7 +298,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
ctx: QueryContextRef,
_: bool,
) -> ServerResult<()> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?;
let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?;
self.inserter
.handle_metric_row_inserts(
requests,
Expand All @@ -279,6 +312,15 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
Ok(())
}

async fn write_fast(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
_with_metric_engine: bool,
) -> ServerResult<()> {
unimplemented!()
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
}

async fn read(
&self,
_request: ReadRequest,
Expand Down
6 changes: 6 additions & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ itertools.workspace = true
lazy_static.workspace = true
mime_guess = "2.0"
notify = "6.1"
object-pool = "0.5"
once_cell.workspace = true
openmetrics-parser = "0.4"
opensrv-mysql = "0.7.0"
Expand Down Expand Up @@ -114,6 +115,7 @@ catalog = { workspace = true, features = ["testing"] }
client.workspace = true
common-base.workspace = true
common-test-util.workspace = true
criterion = "0.4"
mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
Expand All @@ -129,3 +131,7 @@ tokio-test = "0.4"

[build-dependencies]
common-version.workspace = true

[[bench]]
name = "bench_prom"
harness = false
21 changes: 21 additions & 0 deletions src/servers/benches/bench_prom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::criterion_main;

mod prom_decode;

criterion_main! {
prom_decode::benches
}
53 changes: 53 additions & 0 deletions src/servers/benches/prom_decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use api::prom_store::remote::WriteRequest;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use prost::Message;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::proto::PromWriteRequest;

fn bench_decode_prom_request(c: &mut Criterion) {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("benches");
d.push("write_request.pb.data");

let data = Bytes::from(std::fs::read(d).unwrap());

let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
b.iter(|| {
request.clear();
let data = data.clone();
request.merge(data).unwrap();
to_grpc_row_insert_requests(&request).unwrap();
});
})
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data).unwrap();
prom_request.as_row_insert_requests();
});
});
}

criterion_group!(benches, bench_decode_prom_request);
criterion_main!(benches);
Binary file added src/servers/benches/write_request.pb.data
Binary file not shown.
31 changes: 29 additions & 2 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,17 @@
use std::sync::Arc;

use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::v1::RowInsertRequests;
use axum::extract::{Query, RawBody, State};
use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
use axum::Extension;
use bytes::Bytes;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use hyper::Body;
use lazy_static::lazy_static;
use object_pool::Pool;
use prost::Message;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand All @@ -30,9 +34,14 @@ use snafu::prelude::*;

use crate::error::{self, Result, UnexpectedPhysicalTableSnafu};
use crate::prom_store::snappy_decompress;
use crate::proto::PromWriteRequest;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};

pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
lazy_static! {
static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest> =
Pool::new(256, PromWriteRequest::default);
v0y4g3r marked this conversation as resolved.
Show resolved Hide resolved
}

#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct DatabaseQuery {
Expand Down Expand Up @@ -86,14 +95,15 @@ pub async fn remote_write(
.with_label_values(&[db.as_str()])
.start_timer();

let request = decode_remote_write_request(body).await?;
let request = decode_remote_write_request_to_row_inserts(body).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}

handler.write(request, query_ctx, true).await?;
handler.write_fast(request, query_ctx, true).await?;
Ok((StatusCode::NO_CONTENT, ()))
}

Expand Down Expand Up @@ -127,6 +137,23 @@ pub async fn remote_read(
handler.read(request, query_ctx).await
}

async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result<RowInsertRequests> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = Bytes::from(snappy_decompress(&body[..])?);

let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
.merge(buf)
.context(error::DecodePromRemoteRequestSnafu)?;
let (requests, samples) = request.as_row_insert_requests();
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES.observe(samples as f64);
Ok(requests)
}

async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
Expand Down
4 changes: 4 additions & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ pub mod mysql;
pub mod opentsdb;
pub mod otlp;
pub mod postgres;
mod prom_row_builder;
pub mod prom_store;
pub mod prometheus_handler;
pub mod proto;
pub mod query_handler;
#[allow(clippy::all)]
mod repeated_field;
mod row_writer;
pub mod server;
mod shutdown;
Expand Down
Loading