Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding victoriametrics remote write #3641

Merged
merged 2 commits into from
Apr 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ tokio-util = { version = "0.7", features = ["io-util", "compat"] }
toml = "0.8.8"
tonic = { version = "0.10", features = ["tls"] }
uuid = { version = "1", features = ["serde", "v4", "fast-rng"] }
zstd = "0.13"

## workspaces members
api = { path = "src/api" }
Expand Down
1 change: 1 addition & 0 deletions src/servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ tonic-reflection = "0.10"
tower = { version = "0.4", features = ["full"] }
tower-http = { version = "0.4", features = ["full"] }
urlencoding = "2.1"
zstd.workspace = true

[target.'cfg(not(windows))'.dependencies]
tikv-jemalloc-ctl = { version = "0.5", features = ["use_std"] }
Expand Down
17 changes: 13 additions & 4 deletions src/servers/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -219,13 +219,20 @@ pub enum Error {
error: prost::DecodeError,
},

#[snafu(display("Failed to decompress prometheus remote request"))]
DecompressPromRemoteRequest {
#[snafu(display("Failed to decompress snappy prometheus remote request"))]
DecompressSnappyPromRemoteRequest {
location: Location,
#[snafu(source)]
error: snap::Error,
},

#[snafu(display("Failed to decompress zstd prometheus remote request"))]
DecompressZstdPromRemoteRequest {
location: Location,
#[snafu(source)]
error: std::io::Error,
},

#[snafu(display("Failed to send prometheus remote request"))]
SendPromRemoteRequest {
location: Location,
Expand Down Expand Up @@ -504,7 +511,8 @@ impl ErrorExt for Error {
| DecodePromRemoteRequest { .. }
| DecodeOtlpRequest { .. }
| CompressPromRemoteRequest { .. }
| DecompressPromRemoteRequest { .. }
| DecompressSnappyPromRemoteRequest { .. }
| DecompressZstdPromRemoteRequest { .. }
| InvalidPromRemoteRequest { .. }
| InvalidExportMetricsConfig { .. }
| InvalidFlightTicket { .. }
Expand Down Expand Up @@ -657,7 +665,8 @@ impl IntoResponse for Error {
| Error::InvalidOpentsdbJsonRequest { .. }
| Error::DecodePromRemoteRequest { .. }
| Error::DecodeOtlpRequest { .. }
| Error::DecompressPromRemoteRequest { .. }
| Error::DecompressSnappyPromRemoteRequest { .. }
| Error::DecompressZstdPromRemoteRequest { .. }
| Error::InvalidPromRemoteRequest { .. }
| Error::InvalidQuery { .. }
| Error::TimePrecision { .. } => HttpStatusCode::BAD_REQUEST,
Expand Down
65 changes: 50 additions & 15 deletions src/servers/src/http/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ use api::v1::RowInsertRequests;
use axum::extract::{Query, RawBody, State};
use axum::http::{header, HeaderValue, StatusCode};
use axum::response::IntoResponse;
use axum::Extension;
use axum::{Extension, TypedHeader};
use bytes::Bytes;
use common_catalog::consts::DEFAULT_SCHEMA_NAME;
use common_query::prelude::GREPTIME_PHYSICAL_TABLE;
Expand All @@ -35,7 +35,7 @@ use snafu::prelude::*;

use super::header::{write_cost_header_map, GREPTIME_DB_HEADER_METRICS};
use crate::error::{self, Result, UnexpectedPhysicalTableSnafu};
use crate::prom_store::snappy_decompress;
use crate::prom_store::{snappy_decompress, zstd_decompress};
use crate::proto::PromWriteRequest;
use crate::query_handler::{PromStoreProtocolHandlerRef, PromStoreResponse};

Expand All @@ -45,19 +45,26 @@ lazy_static! {
Pool::new(256, PromWriteRequest::default);
}

pub const DEFAULT_ENCODING: &str = "snappy";
pub const VM_ENCODING: &str = "zstd";
pub const VM_PROTO_VERSION: &str = "1";

#[derive(Debug, Serialize, Deserialize, JsonSchema)]
pub struct DatabaseQuery {
pub struct RemoteWriteQuery {
pub db: Option<String>,
/// Specify which physical table to use for storing metrics.
/// This only works on remote write requests.
pub physical_table: Option<String>,
/// For VictoriaMetrics modified remote write protocol
pub get_vm_proto_version: Option<String>,
}

impl Default for DatabaseQuery {
fn default() -> DatabaseQuery {
impl Default for RemoteWriteQuery {
fn default() -> RemoteWriteQuery {
Self {
db: Some(DEFAULT_SCHEMA_NAME.to_string()),
physical_table: Some(GREPTIME_PHYSICAL_TABLE.to_string()),
get_vm_proto_version: None,
}
}
}
Expand All @@ -66,16 +73,23 @@ impl Default for DatabaseQuery {
#[axum_macros::debug_handler]
pub async fn route_write_without_metric_engine(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}

let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let (request, samples) = decode_remote_write_request(body).await?;
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request(is_zstd, body).await?;
// reject if physical table is specified when metric engine is disabled
if params.physical_table.is_some() {
return UnexpectedPhysicalTableSnafu {}.fail();
Expand All @@ -86,7 +100,8 @@ pub async fn route_write_without_metric_engine(
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
))
)
.into_response())
}

#[axum_macros::debug_handler]
Expand All @@ -96,16 +111,23 @@ pub async fn route_write_without_metric_engine(
)]
pub async fn remote_write(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
Query(params): Query<RemoteWriteQuery>,
Extension(mut query_ctx): Extension<QueryContextRef>,
content_encoding: TypedHeader<headers::ContentEncoding>,
RawBody(body): RawBody,
) -> Result<impl IntoResponse> {
// VictoriaMetrics handshake
if let Some(_vm_handshake) = params.get_vm_proto_version {
return Ok(VM_PROTO_VERSION.into_response());
}

let db = params.db.clone().unwrap_or_default();
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_WRITE_ELAPSED
.with_label_values(&[db.as_str()])
.start_timer();

let (request, samples) = decode_remote_write_request_to_row_inserts(body).await?;
let is_zstd = content_encoding.contains(VM_ENCODING);
let (request, samples) = decode_remote_write_request_to_row_inserts(is_zstd, body).await?;

if let Some(physical_table) = params.physical_table {
let mut new_query_ctx = query_ctx.as_ref().clone();
Expand All @@ -118,7 +140,8 @@ pub async fn remote_write(
Ok((
StatusCode::NO_CONTENT,
write_cost_header_map(output.meta.cost),
))
)
.into_response())
}

impl IntoResponse for PromStoreResponse {
Expand Down Expand Up @@ -147,7 +170,7 @@ impl IntoResponse for PromStoreResponse {
)]
pub async fn remote_read(
State(handler): State<PromStoreProtocolHandlerRef>,
Query(params): Query<DatabaseQuery>,
Query(params): Query<RemoteWriteQuery>,
Extension(query_ctx): Extension<QueryContextRef>,
RawBody(body): RawBody,
) -> Result<PromStoreResponse> {
Expand All @@ -162,14 +185,19 @@ pub async fn remote_read(
}

async fn decode_remote_write_request_to_row_inserts(
is_zstd: bool,
body: Body,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = Bytes::from(snappy_decompress(&body[..])?);
let buf = Bytes::from(if is_zstd {
zstd_decompress(&body[..])?
} else {
snappy_decompress(&body[..])?
});

let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default);
request
Expand All @@ -178,13 +206,20 @@ async fn decode_remote_write_request_to_row_inserts(
Ok(request.as_row_insert_requests())
}

async fn decode_remote_write_request(body: Body) -> Result<(RowInsertRequests, usize)> {
async fn decode_remote_write_request(
is_zstd: bool,
body: Body,
) -> Result<(RowInsertRequests, usize)> {
let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer();
let body = hyper::body::to_bytes(body)
.await
.context(error::HyperSnafu)?;

let buf = Bytes::from(snappy_decompress(&body[..])?);
let buf = Bytes::from(if is_zstd {
zstd_decompress(&body[..])?
} else {
snappy_decompress(&body[..])?
});

let mut request = PromWriteRequest::default();
request
Expand Down
7 changes: 6 additions & 1 deletion src/servers/src/prom_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ pub fn snappy_decompress(buf: &[u8]) -> Result<Vec<u8>> {
let mut decoder = Decoder::new();
decoder
.decompress_vec(buf)
.context(error::DecompressPromRemoteRequestSnafu)
.context(error::DecompressSnappyPromRemoteRequestSnafu)
}

#[inline]
Expand All @@ -400,6 +400,11 @@ pub fn snappy_compress(buf: &[u8]) -> Result<Vec<u8>> {
.context(error::CompressPromRemoteRequestSnafu)
}

#[inline]
pub fn zstd_decompress(buf: &[u8]) -> Result<Vec<u8>> {
zstd::stream::decode_all(buf).context(error::DecompressZstdPromRemoteRequestSnafu)
}

/// Mock timeseries for test, it is both used in servers and frontend crate
/// So we present it here
pub fn mock_timeseries() -> Vec<TimeSeries> {
Expand Down
2 changes: 2 additions & 0 deletions tests-integration/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ mysql_async = { version = "0.33", default-features = false, features = [
] }
object-store.workspace = true
operator.workspace = true
prost.workspace = true
query.workspace = true
rstest = "0.17"
rstest_reuse = "0.5"
Expand All @@ -68,6 +69,7 @@ tokio.workspace = true
tonic.workspace = true
tower = "0.4"
uuid.workspace = true
zstd.workspace = true

[dev-dependencies]
datafusion.workspace = true
Expand Down
65 changes: 65 additions & 0 deletions tests-integration/tests/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,12 @@

use std::collections::BTreeMap;

use api::prom_store::remote::WriteRequest;
use auth::user_provider_from_option;
use axum::http::{HeaderName, StatusCode};
use axum_test_helper::TestClient;
use common_error::status_code::StatusCode as ErrorCode;
use prost::Message;
use serde_json::json;
use servers::http::error_result::ErrorResponse;
use servers::http::greptime_result_v1::GreptimedbV1Response;
Expand All @@ -26,6 +28,7 @@ use servers::http::header::GREPTIME_TIMEZONE_HEADER_NAME;
use servers::http::influxdb_result_v1::{InfluxdbOutput, InfluxdbV1Response};
use servers::http::prometheus::{PrometheusJsonResponse, PrometheusResponse};
use servers::http::GreptimeQueryOutput;
use servers::prom_store;
use tests_integration::test_util::{
setup_test_http_app, setup_test_http_app_with_frontend,
setup_test_http_app_with_frontend_and_user_provider, setup_test_prom_app_with_frontend,
Expand Down Expand Up @@ -71,6 +74,8 @@ macro_rules! http_tests {
test_status_api,
test_config_api,
test_dashboard_path,
test_prometheus_remote_write,
test_vm_proto_remote_write,
);
)*
};
Expand Down Expand Up @@ -896,3 +901,63 @@ pub async fn test_dashboard_path(store_type: StorageType) {

#[cfg(not(feature = "dashboard"))]
pub async fn test_dashboard_path(_: StorageType) {}

pub async fn test_prometheus_remote_write(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_prom_app_with_frontend(store_type, "prometheus_remote_write").await;
let client = TestClient::new(app);

// write snappy encoded data
let write_request = WriteRequest {
timeseries: prom_store::mock_timeseries(),
..Default::default()
};
let serialized_request = write_request.encode_to_vec();
let compressed_request =
prom_store::snappy_compress(&serialized_request).expect("failed to encode snappy");

let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "snappy")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);

guard.remove_all().await;
}

pub async fn test_vm_proto_remote_write(store_type: StorageType) {
common_telemetry::init_default_ut_logging();
let (app, mut guard) =
setup_test_prom_app_with_frontend(store_type, "vm_proto_remote_write").await;

// handshake
let client = TestClient::new(app);
let res = client
.post("/v1/prometheus/write?get_vm_proto_version=1")
.send()
.await;
assert_eq!(res.status(), StatusCode::OK);
assert_eq!(res.text().await, "1");

// write zstd encoded data
let write_request = WriteRequest {
timeseries: prom_store::mock_timeseries(),
..Default::default()
};
let serialized_request = write_request.encode_to_vec();
let compressed_request =
zstd::stream::encode_all(&serialized_request[..], 1).expect("Failed to encode zstd");

let res = client
.post("/v1/prometheus/write")
.header("Content-Encoding", "zstd")
.body(compressed_request)
.send()
.await;
assert_eq!(res.status(), StatusCode::NO_CONTENT);

guard.remove_all().await;
}