-
Notifications
You must be signed in to change notification settings - Fork 223
feat: support opentsdb put api #1037
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
2cf23c9
feat: support opentsdb put api
zouxiang1993 49f5457
draft
zouxiang1993 cc9af04
Merge branch 'main' into opentsdb-put-api
zouxiang1993 2ce7cdd
draft
zouxiang1993 d634dc0
draft
zouxiang1993 004d237
remove StringValue from Value enum
zouxiang1993 fc2c3f4
delete a comment line
zouxiang1993 549abe6
fix CR
zouxiang1993 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,50 @@ | ||
| // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. | ||
|
|
||
| //! This module implements [put][1] for OpenTSDB | ||
| //! [1]: http://opentsdb.net/docs/build/html/api_http/put.html | ||
|
|
||
| use ceresdbproto::storage::{ | ||
| RequestContext as GrpcRequestContext, WriteRequest as GrpcWriteRequest, | ||
| }; | ||
| use log::debug; | ||
| use query_engine::executor::Executor as QueryExecutor; | ||
|
|
||
| use crate::{ | ||
| context::RequestContext, | ||
| error::Result, | ||
| opentsdb::types::{convert_put_request, PutRequest, PutResponse}, | ||
| Context, Proxy, | ||
| }; | ||
|
|
||
| pub mod types; | ||
|
|
||
| impl<Q: QueryExecutor + 'static> Proxy<Q> { | ||
| pub async fn handle_opentsdb_put( | ||
| &self, | ||
| ctx: RequestContext, | ||
| req: PutRequest, | ||
| ) -> Result<PutResponse> { | ||
| let table_request = GrpcWriteRequest { | ||
| context: Some(GrpcRequestContext { | ||
| database: ctx.schema.clone(), | ||
| }), | ||
| table_requests: convert_put_request(req)?, | ||
| }; | ||
| let proxy_context = Context { | ||
| timeout: ctx.timeout, | ||
| runtime: self.engine_runtimes.write_runtime.clone(), | ||
| enable_partition_table_access: false, | ||
| forwarded_from: None, | ||
| }; | ||
| let result = self | ||
| .handle_write_internal(proxy_context, table_request) | ||
| .await?; | ||
|
|
||
| debug!( | ||
| "OpenTSDB write finished, catalog:{}, schema:{}, result:{result:?}", | ||
| ctx.catalog, ctx.schema | ||
| ); | ||
|
|
||
| Ok(()) | ||
| } | ||
| } |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,199 @@ | ||
| // Copyright 2023 CeresDB Project Authors. Licensed under Apache-2.0. | ||
|
|
||
| use std::{ | ||
| collections::{HashMap, HashSet}, | ||
| fmt::Debug, | ||
| }; | ||
|
|
||
| use bytes::Bytes; | ||
| use ceresdbproto::storage::{ | ||
| value, Field, FieldGroup, Tag, Value as ProtoValue, WriteSeriesEntry, WriteTableRequest, | ||
| }; | ||
| use common_util::{error::BoxError, time::try_to_millis}; | ||
| use http::StatusCode; | ||
| use serde::Deserialize; | ||
| use serde_json::from_slice; | ||
| use snafu::{OptionExt, ResultExt}; | ||
|
|
||
| use crate::error::{ErrNoCause, ErrWithCause, Result}; | ||
|
|
||
| const OPENTSDB_DEFAULT_FIELD: &str = "value"; | ||
|
|
||
| #[derive(Debug)] | ||
| pub struct PutRequest { | ||
| pub points: Bytes, | ||
|
|
||
| pub summary: Option<String>, | ||
| pub details: Option<String>, | ||
| pub sync: Option<String>, | ||
| pub sync_timeout: i32, | ||
| } | ||
|
|
||
| impl PutRequest { | ||
| pub fn new(points: Bytes, params: PutParams) -> Self { | ||
| PutRequest { | ||
| points, | ||
| summary: params.summary, | ||
| details: params.details, | ||
| sync: params.sync, | ||
| sync_timeout: params.sync_timeout, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| pub type PutResponse = (); | ||
|
|
||
| /// Query string parameters for put api | ||
| /// | ||
| /// It's derived from query string parameters of put described in | ||
| /// doc of OpenTSDB 2.4: | ||
| /// http://opentsdb.net/docs/build/html/api_http/put.html#requests | ||
| /// | ||
| /// NOTE: | ||
| /// - all the params is unimplemented. | ||
| #[derive(Debug, Default, Deserialize)] | ||
| #[serde(default)] | ||
| pub struct PutParams { | ||
| pub summary: Option<String>, | ||
| pub details: Option<String>, | ||
| pub sync: Option<String>, | ||
| pub sync_timeout: i32, | ||
| } | ||
|
|
||
| #[derive(Debug, Deserialize)] | ||
| pub struct Point { | ||
| pub metric: String, | ||
| pub timestamp: i64, | ||
| pub value: Value, | ||
| pub tags: HashMap<String, String>, | ||
| } | ||
|
|
||
| #[derive(Debug, Deserialize)] | ||
| #[serde(untagged)] | ||
| pub enum Value { | ||
| IntegerValue(i64), | ||
| F64Value(f64), | ||
| } | ||
|
|
||
| pub(crate) fn convert_put_request(req: PutRequest) -> Result<Vec<WriteTableRequest>> { | ||
| let points = { | ||
| // multi points represent as json array | ||
| let parse_array = from_slice::<Vec<Point>>(&req.points); | ||
| match parse_array { | ||
| Ok(points) => Ok(points), | ||
| Err(_e) => { | ||
| // single point represent as json object | ||
| let parse_object = from_slice::<Point>(&req.points); | ||
| match parse_object { | ||
| Ok(point) => Ok(vec![point]), | ||
| Err(e) => Err(e), | ||
| } | ||
| } | ||
| } | ||
| }; | ||
| let points = points.box_err().with_context(|| ErrWithCause { | ||
| code: StatusCode::BAD_REQUEST, | ||
| msg: "Json parse error".to_string(), | ||
| })?; | ||
| validate(&points)?; | ||
|
|
||
| let mut points_per_metric = HashMap::with_capacity(100); | ||
| for point in points { | ||
| points_per_metric | ||
| .entry(point.metric.clone()) | ||
| .or_insert(Vec::new()) | ||
| .push(point); | ||
| } | ||
|
|
||
| let mut requests = Vec::with_capacity(points_per_metric.len()); | ||
| for (metric, points) in points_per_metric { | ||
| let mut tag_names_set = HashSet::with_capacity(points[0].tags.len() * 2); | ||
| for point in &points { | ||
| for tag_name in point.tags.keys() { | ||
| tag_names_set.insert(tag_name.clone()); | ||
| } | ||
| } | ||
|
|
||
| let mut tag_name_to_tag_index: HashMap<String, u32> = | ||
| HashMap::with_capacity(tag_names_set.len()); | ||
| let mut tag_names = Vec::with_capacity(tag_names_set.len()); | ||
| for (idx, tag_name) in tag_names_set.into_iter().enumerate() { | ||
| tag_name_to_tag_index.insert(tag_name.clone(), idx as u32); | ||
| tag_names.push(tag_name); | ||
| } | ||
|
|
||
| let mut req = WriteTableRequest { | ||
| table: metric, | ||
| tag_names, | ||
| field_names: vec![String::from(OPENTSDB_DEFAULT_FIELD)], | ||
| entries: Vec::with_capacity(points.len()), | ||
| }; | ||
|
|
||
| for point in points { | ||
| let timestamp = point.timestamp; | ||
| let timestamp = try_to_millis(timestamp) | ||
| .with_context(|| ErrNoCause { | ||
| code: StatusCode::BAD_REQUEST, | ||
| msg: format!("Invalid timestamp: {}", point.timestamp), | ||
| })? | ||
| .as_i64(); | ||
|
|
||
| let mut tags = Vec::with_capacity(point.tags.len()); | ||
| for (tag_name, tag_value) in point.tags { | ||
| let &tag_index = tag_name_to_tag_index.get(&tag_name).unwrap(); | ||
zouxiang1993 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| tags.push(Tag { | ||
| name_index: tag_index, | ||
| value: Some(ProtoValue { | ||
| value: Some(value::Value::StringValue(tag_value)), | ||
| }), | ||
| }); | ||
| } | ||
|
|
||
| let value = match point.value { | ||
| Value::IntegerValue(v) => value::Value::Int64Value(v), | ||
| Value::F64Value(v) => value::Value::Float64Value(v), | ||
| }; | ||
| let fields = vec![Field { | ||
| name_index: 0, | ||
| value: Some(ProtoValue { value: Some(value) }), | ||
| }]; | ||
|
|
||
| let field_groups = vec![FieldGroup { timestamp, fields }]; | ||
|
|
||
| req.entries.push(WriteSeriesEntry { tags, field_groups }); | ||
| } | ||
| requests.push(req); | ||
| } | ||
|
|
||
| Ok(requests) | ||
| } | ||
|
|
||
| pub(crate) fn validate(points: &[Point]) -> Result<()> { | ||
| for point in points { | ||
| if point.metric.is_empty() { | ||
| return ErrNoCause { | ||
| code: StatusCode::BAD_REQUEST, | ||
| msg: "Metric must not be empty", | ||
| } | ||
| .fail(); | ||
| } | ||
| if point.tags.is_empty() { | ||
| return ErrNoCause { | ||
| code: StatusCode::BAD_REQUEST, | ||
| msg: "At least one tag must be supplied", | ||
| } | ||
| .fail(); | ||
| } | ||
| for tag_name in point.tags.keys() { | ||
| if tag_name.is_empty() { | ||
| return ErrNoCause { | ||
| code: StatusCode::BAD_REQUEST, | ||
| msg: "Tag name must not be empty", | ||
| } | ||
| .fail(); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| Ok(()) | ||
| } | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.