Skip to content

Commit

Permalink
feat: decode prom requests to grpc (#3425)
Browse files Browse the repository at this point in the history
* hack: inline decode

* move to servers

* fix: samples lost

* add bench

* remove useless functions

* wip

* feat: remove object pools

* fix: minor issues

* fix: remove useless dep

* chore: rebase main

* format

* finish

* fix: format

* feat: introduce request pool

* try to fix license issue

* fix: clippy

* resolve comments

* fix:typo

* remove useless comments
  • Loading branch information
v0y4g3r committed Mar 5, 2024
1 parent 7b1c350 commit 02b18fb
Show file tree
Hide file tree
Showing 14 changed files with 1,318 additions and 25 deletions.
11 changes: 11 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

46 changes: 44 additions & 2 deletions src/frontend/src/instance/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::sync::Arc;

use api::prom_store::remote::read_request::ResponseType;
use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest};
use api::v1::RowInsertRequests;
use async_trait::async_trait;
use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq};
use common_catalog::format_full_table_name;
Expand Down Expand Up @@ -174,7 +175,7 @@ impl PromStoreProtocolHandler for Instance {
.get::<PromStoreProtocolInterceptorRef<servers::error::Error>>();
interceptor_ref.pre_write(&request, ctx.clone())?;

let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?;
let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?;
if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
Expand All @@ -197,6 +198,38 @@ impl PromStoreProtocolHandler for Instance {
Ok(())
}

async fn write_fast(
&self,
request: RowInsertRequests,
ctx: QueryContextRef,
with_metric_engine: bool,
) -> ServerResult<()> {
self.plugins
.get::<PermissionCheckerRef>()
.as_ref()
.check_permission(ctx.current_user(), PermissionReq::PromStoreWrite)
.context(AuthSnafu)?;

if with_metric_engine {
let physical_table = ctx
.extension(PHYSICAL_TABLE_PARAM)
.unwrap_or(GREPTIME_PHYSICAL_TABLE)
.to_string();
let _ = self
.handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
} else {
let _ = self
.handle_row_inserts(request, ctx.clone())
.await
.map_err(BoxedError::new)
.context(error::ExecuteGrpcQuerySnafu)?;
}
Ok(())
}

async fn read(
&self,
request: ReadRequest,
Expand Down Expand Up @@ -276,7 +309,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
ctx: QueryContextRef,
_: bool,
) -> ServerResult<()> {
let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?;
let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?;
self.inserter
.handle_metric_row_inserts(
requests,
Expand All @@ -290,6 +323,15 @@ impl PromStoreProtocolHandler for ExportMetricHandler {
Ok(())
}

async fn write_fast(
&self,
_request: RowInsertRequests,
_ctx: QueryContextRef,
_with_metric_engine: bool,
) -> ServerResult<()> {
unimplemented!()
}

async fn read(
&self,
_request: ReadRequest,
Expand Down
6 changes: 6 additions & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ itertools.workspace = true
lazy_static.workspace = true
mime_guess = "2.0"
notify = "6.1"
object-pool = "0.5"
once_cell.workspace = true
openmetrics-parser = "0.4"
opensrv-mysql = "0.7.0"
Expand Down Expand Up @@ -114,6 +115,7 @@ catalog = { workspace = true, features = ["testing"] }
client.workspace = true
common-base.workspace = true
common-test-util.workspace = true
criterion = "0.4"
mysql_async = { version = "0.33", default-features = false, features = [
"default-rustls",
] }
Expand All @@ -129,3 +131,7 @@ tokio-test = "0.4"

[build-dependencies]
common-version.workspace = true

[[bench]]
name = "bench_prom"
harness = false
21 changes: 21 additions & 0 deletions src/servers/benches/bench_prom.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use criterion::criterion_main;

mod prom_decode;

criterion_main! {
prom_decode::benches
}
53 changes: 53 additions & 0 deletions src/servers/benches/prom_decode.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::time::Duration;

use api::prom_store::remote::WriteRequest;
use bytes::Bytes;
use criterion::{criterion_group, criterion_main, Criterion};
use prost::Message;
use servers::prom_store::to_grpc_row_insert_requests;
use servers::proto::PromWriteRequest;

fn bench_decode_prom_request(c: &mut Criterion) {
let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"));
d.push("benches");
d.push("write_request.pb.data");

let data = Bytes::from(std::fs::read(d).unwrap());

let mut request = WriteRequest::default();
let mut prom_request = PromWriteRequest::default();
c.benchmark_group("decode")
.measurement_time(Duration::from_secs(3))
.bench_function("write_request", |b| {
b.iter(|| {
request.clear();
let data = data.clone();
request.merge(data).unwrap();
to_grpc_row_insert_requests(&request).unwrap();
});
})
.bench_function("prom_write_request", |b| {
b.iter(|| {
let data = data.clone();
prom_request.merge(data).unwrap();
prom_request.as_row_insert_requests();
});
});
}

criterion_group!(benches, bench_decode_prom_request);
criterion_main!(benches);
Binary file added src/servers/benches/write_request.pb.data
Binary file not shown.
31 changes: 29 additions & 2 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
use std::sync::Arc;

use api::prom_store::remote::{ReadRequest, WriteRequest};
use api::v1::RowInsertRequests;
use axum::extract::{Query, RawBody, State};
use axum::http::{header, StatusCode};
use axum::response::IntoResponse;
use axum::Extension;
use bytes::Bytes;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
use common_telemetry::tracing;
use hyper::Body;
use lazy_static::lazy_static;
use object_pool::Pool;
use prost::Message;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
Expand All @@ -31,9 +35,14 @@ use snafu::prelude::*;

use crate::error::{self, Result, UnexpectedPhysicalTableSnafu};
use crate::prom_store::snappy_decompress;
use crate::proto::PromWriteRequest;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};

pub const PHYSICAL_TABLE_PARAM: &str = "physical_table";
lazy_static! {
static ref PROM_WRITE_REQUEST_POOL: Pool<PromWriteRequest> =
Pool::new(256, PromWriteRequest::default);
}

#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct DatabaseQuery {
Expand Down Expand Up @@ -91,14 +100,15 @@ pub async fn remote_write(
.with_label_values(&[db.as_str()])
.start_timer();

let request = decode_remote_write_request(body).await?;
let request = decode_remote_write_request_to_row_inserts(body).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table);
query_ctx = Arc::new(new_query_ctx);
}

handler.write(request, query_ctx, true).await?;
handler.write_fast(request, query_ctx, true).await?;
Ok((StatusCode::NO_CONTENT, ()))
}

Expand Down Expand Up @@ -136,6 +146,23 @@ pub async fn remote_read(
handler.read(request, query_ctx).await
}

async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result<RowInsertRequests> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = Bytes::from(snappy_decompress(&body[..])?);

let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
.merge(buf)
.context(error::DecodePromRemoteRequestSnafu)?;
let (requests, samples) = request.as_row_insert_requests();
crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES.observe(samples as f64);
Ok(requests)
}

async fn decode_remote_write_request(body: Body) -> Result<WriteRequest> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
Expand Down
4 changes: 4 additions & 0 deletions src/servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ pub mod mysql;
pub mod opentsdb;
pub mod otlp;
pub mod postgres;
mod prom_row_builder;
pub mod prom_store;
pub mod prometheus_handler;
pub mod proto;
pub mod query_handler;
#[allow(clippy::all)]
mod repeated_field;
mod row_writer;
pub mod server;
mod shutdown;
Expand Down

0 comments on commit 02b18fb

Please sign in to comment.