From 02b18fbca14a86f48d0d6ba7e6d8338e0a7054df Mon Sep 17 00:00:00 2001 From: "Lei, HUANG" <6406592+v0y4g3r@users.noreply.github.com> Date: Tue, 5 Mar 2024 17:47:32 +0800 Subject: [PATCH] feat: decode prom requests to grpc (#3425) * hack: inline decode * move to servers * fix: samples lost * add bench * remove useless functions * wip * feat: remove object pools * fix: minor issues * fix: remove useless dep * chore: rebase main * format * finish * fix: format * feat: introduce request pool * try to fix license issue * fix: clippy * resolve comments * fix:typo * remove useless comments --- Cargo.lock | 11 + src/frontend/src/instance/prom_store.rs | 46 +- src/servers/Cargo.toml | 6 + src/servers/benches/bench_prom.rs | 21 + src/servers/benches/prom_decode.rs | 53 +++ src/servers/benches/write_request.pb.data | Bin 0 -> 189265 bytes src/servers/src/http/prom_store.rs | 31 +- src/servers/src/lib.rs | 4 + src/servers/src/prom_row_builder.rs | 272 +++++++++++ src/servers/src/prom_store.rs | 12 +- src/servers/src/proto.rs | 304 ++++++++++++ src/servers/src/query_handler.rs | 10 + src/servers/src/repeated_field.rs | 540 ++++++++++++++++++++++ src/servers/tests/http/prom_store_test.rs | 33 +- 14 files changed, 1318 insertions(+), 25 deletions(-) create mode 100644 src/servers/benches/bench_prom.rs create mode 100644 src/servers/benches/prom_decode.rs create mode 100644 src/servers/benches/write_request.pb.data create mode 100644 src/servers/src/prom_row_builder.rs create mode 100644 src/servers/src/proto.rs create mode 100644 src/servers/src/repeated_field.rs diff --git a/Cargo.lock b/Cargo.lock index a8de2387aea..e031e23632a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5921,6 +5921,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "object-pool" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee9a3e7196d09ec86002b939f1576e8e446d58def8fd48fe578e2c72d5328d68" +dependencies = [ + "parking_lot 0.11.2", +] + [[package]] name = "object-store" version = "0.6.0" @@ -9054,6 +9063,7 @@ dependencies = [ "common-test-util", "common-time", "common-version", + "criterion", "datafusion", "datafusion-common", "datafusion-expr", @@ -9073,6 +9083,7 @@ dependencies = [ "mime_guess", "mysql_async", "notify", + "object-pool", "once_cell", "openmetrics-parser", "opensrv-mysql", diff --git a/src/frontend/src/instance/prom_store.rs b/src/frontend/src/instance/prom_store.rs index 5e21188294c..22402bebff2 100644 --- a/src/frontend/src/instance/prom_store.rs +++ b/src/frontend/src/instance/prom_store.rs @@ -16,6 +16,7 @@ use std::sync::Arc; use api::prom_store::remote::read_request::ResponseType; use api::prom_store::remote::{Query, QueryResult, ReadRequest, ReadResponse, WriteRequest}; +use api::v1::RowInsertRequests; use async_trait::async_trait; use auth::{PermissionChecker, PermissionCheckerRef, PermissionReq}; use common_catalog::format_full_table_name; @@ -174,7 +175,7 @@ impl PromStoreProtocolHandler for Instance { .get::>(); interceptor_ref.pre_write(&request, ctx.clone())?; - let (requests, samples) = prom_store::to_grpc_row_insert_requests(request)?; + let (requests, samples) = prom_store::to_grpc_row_insert_requests(&request)?; if with_metric_engine { let physical_table = ctx .extension(PHYSICAL_TABLE_PARAM) @@ -197,6 +198,38 @@ impl PromStoreProtocolHandler for Instance { Ok(()) } + async fn write_fast( + &self, + request: RowInsertRequests, + ctx: QueryContextRef, + with_metric_engine: bool, + ) -> ServerResult<()> { + self.plugins + .get::() + .as_ref() + .check_permission(ctx.current_user(), PermissionReq::PromStoreWrite) + .context(AuthSnafu)?; + + if with_metric_engine { + let physical_table = ctx + .extension(PHYSICAL_TABLE_PARAM) + .unwrap_or(GREPTIME_PHYSICAL_TABLE) + .to_string(); + let _ = self + .handle_metric_row_inserts(request, ctx.clone(), physical_table.to_string()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + } else { + let _ = self + .handle_row_inserts(request, ctx.clone()) + .await + .map_err(BoxedError::new) + .context(error::ExecuteGrpcQuerySnafu)?; + } + Ok(()) + } + async fn read( &self, request: ReadRequest, @@ -276,7 +309,7 @@ impl PromStoreProtocolHandler for ExportMetricHandler { ctx: QueryContextRef, _: bool, ) -> ServerResult<()> { - let (requests, _) = prom_store::to_grpc_row_insert_requests(request)?; + let (requests, _) = prom_store::to_grpc_row_insert_requests(&request)?; self.inserter .handle_metric_row_inserts( requests, @@ -290,6 +323,15 @@ impl PromStoreProtocolHandler for ExportMetricHandler { Ok(()) } + async fn write_fast( + &self, + _request: RowInsertRequests, + _ctx: QueryContextRef, + _with_metric_engine: bool, + ) -> ServerResult<()> { + unimplemented!() + } + async fn read( &self, _request: ReadRequest, diff --git a/src/servers/Cargo.toml b/src/servers/Cargo.toml index 69f31881549..a9363a0efe7 100644 --- a/src/servers/Cargo.toml +++ b/src/servers/Cargo.toml @@ -60,6 +60,7 @@ itertools.workspace = true lazy_static.workspace = true mime_guess = "2.0" notify = "6.1" +object-pool = "0.5" once_cell.workspace = true openmetrics-parser = "0.4" opensrv-mysql = "0.7.0" @@ -114,6 +115,7 @@ catalog = { workspace = true, features = ["testing"] } client.workspace = true common-base.workspace = true common-test-util.workspace = true +criterion = "0.4" mysql_async = { version = "0.33", default-features = false, features = [ "default-rustls", ] } @@ -129,3 +131,7 @@ tokio-test = "0.4" [build-dependencies] common-version.workspace = true + +[[bench]] +name = "bench_prom" +harness = false diff --git a/src/servers/benches/bench_prom.rs b/src/servers/benches/bench_prom.rs new file mode 100644 index 00000000000..df052844f14 --- /dev/null +++ b/src/servers/benches/bench_prom.rs @@ -0,0 +1,21 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use criterion::criterion_main; + +mod prom_decode; + +criterion_main! { + prom_decode::benches +} diff --git a/src/servers/benches/prom_decode.rs b/src/servers/benches/prom_decode.rs new file mode 100644 index 00000000000..4b40683cf08 --- /dev/null +++ b/src/servers/benches/prom_decode.rs @@ -0,0 +1,53 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::time::Duration; + +use api::prom_store::remote::WriteRequest; +use bytes::Bytes; +use criterion::{criterion_group, criterion_main, Criterion}; +use prost::Message; +use servers::prom_store::to_grpc_row_insert_requests; +use servers::proto::PromWriteRequest; + +fn bench_decode_prom_request(c: &mut Criterion) { + let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + d.push("benches"); + d.push("write_request.pb.data"); + + let data = Bytes::from(std::fs::read(d).unwrap()); + + let mut request = WriteRequest::default(); + let mut prom_request = PromWriteRequest::default(); + c.benchmark_group("decode") + .measurement_time(Duration::from_secs(3)) + .bench_function("write_request", |b| { + b.iter(|| { + request.clear(); + let data = data.clone(); + request.merge(data).unwrap(); + to_grpc_row_insert_requests(&request).unwrap(); + }); + }) + .bench_function("prom_write_request", |b| { + b.iter(|| { + let data = data.clone(); + prom_request.merge(data).unwrap(); + prom_request.as_row_insert_requests(); + }); + }); +} + +criterion_group!(benches, bench_decode_prom_request); +criterion_main!(benches); diff --git a/src/servers/benches/write_request.pb.data b/src/servers/benches/write_request.pb.data new file mode 100644 index 0000000000000000000000000000000000000000..19219dc4ee1703c4e818c5541d09a6147d1cc5da GIT binary patch literal 189265 zcmd^|378yJx%YdLNitJpnREh#0AUhB2!YP@zKjY9346eh1wnTi2_mwtoDc6xq7WllFOnUb#NP?Iw%2HZX6j4d4{j$glN>WjI zRpj>MII;PYQEo0bd6Sg#&d~p^I8>BmRg}5u+%#D%=CWzt+vewH zavicbkX5o$!8=LzbF;bW<)2;t(cZRo-mXrM{?FVMUAH`X?Rzf;xN!rwz6#g9mB6Xo z0+VoFBAk*;<)rjxQ9zW-JFP0oIio&RE)@#dLLcT+t1S{DFMUHB=Qq9eV*Ra^biXKQ z5t6JhBo#3&6(DR>N#%1~2RAugENM}I7eLS;uYdKjyMoWmSXq_E5nBM4ZTprt*Po$w z6E=usdfht9x=jio3-`@!YvgxdUw{OMtX$Xeldhki??QqL;x_BA*Ph)m`|W*^;6k`? z_r7@Dsq@~RhXfbKh2xe^|H91dY$Uh{F5E}&t&6oCu!O)lauVc_TYS?CfupW9TGbI7 za>S*7_TAs#y8?*~Ip98i{Y6*4@r!+M+7vn7t_u9@>(3pG#1_Tv3meyjmR$UU`8c}8 zaM{|||KE3R`xBCGaa^`b|9j~H$A8=y8XWc01TNc`t}OiUjirWe*R*kmnzFb9>asXB zn->*G+_n6lWdi5%Y};8p`Ee3zcCnB%5SWDQcy;PS1nXK#wOFNXz_sru?t08Q7rtJL z7oLNl7xKsn;dOq`HUzzpM@?wQsh_{``_9`DdBg<0{_~0%Mt8Iw_sF9q=(Y2?#Sap_ z?Aio=c&$C_e~j?5i-jLu^SeZ&H`>KQ4=&GxYrp4QE#%m0^LQS-@eQLd+EoiV34@R? z{OAGaY9Z%epw`oS|G~Lh$f*|?*NPXOcCHq3)&-6D{%gyOp4cKI9OSbq)bHG>ogMbv zLsxw`=W?rcz2bz4Oz-i`qVi1c$8Lp$i`Q)!b7q zLV`mcFMqZAiaQ55Eai|#&7Y=jofW=i8ul*@@}Rg#lJN#<2p>#k;V+#Ifx{H}LuD$T>1DQ=PPiO$Yn z9`VYDhgX+FZ?gwF*91NS&2NvU#1!BzC3e_e3* z&;Q3JL{P=F^*TS+BaQ?@0N+EMdLH9&4AGRa>>krF?ToaMm3QWI~A& zqd=LF#j|ipJ5T=M%@?=dL`YHEUdSq&wS5xEaeD60rF%~L8FJeT`7ExYSKVKD`I;F9 z<*oK2bhbG{+eLv-<6>U^&1E~%r$_c04P zJ(EqzV!EF%6vP}Z?uOZ?o_PJ;@00NfWgyLDb7BT{r&U+y@sDr+;|I(7iLQ6p({*pN zuABy{3@fUb$5r*dD^h3v`HEp&*lZz_Eo4)C0a@99+_P5thU_XzgQBdIipAV8gT8d`zqa3hPQWlylx|;?VH15xMh=15hjyy_tTNIQl$i#n{a_0W@-kVru3{7`B^PEGH5;iJLfLBsS>Bqh7zaV1q=?Nb+=XhbT?P^yTz zY@sxSOL}AMxRVq2Wk}xTu$QUOS{bi7l#>QDNlAB|ZP!_Z{=f45Rm> z^eB8m?=L)c&yrIV()K7lN`)FLgYrrHW8tCp|1u;+iBnY^QnfiBZtK7OBEKxs^A4Fq zqfGhLeo@Zzx#Sk9fQ-B6gIkt;w{MU{0m@9eBovcpmBf;WoVC3Bz^qk^b{s`uQRY%5 z?YlbM7&zkAJDxiH*M_a4)KLcV_#SZUyjXtAi$<{*WqvY*JwMsw&S&O+zo(BhI|}6% zY|39e`~2^>DsPdF|Ft&m5YsBu{*70mlu}yzj?;z3C)n!*|GY(1guSB_e9YIJ$mf$^ zrQ%|~!c;HYbyRpREG!Y>U=tQagEC%5s+a91D!AHcW3qZw4|p>bU~RiHS~RLhbke}+ zYen^NH&W@xdWoo>&Pn(&SsAJaq-$Gu7KDhiI2p=VEdSJtx1B2X+*$RhXT074Bb&t@ z(!5twq1AR>?4=z7<)p_lS;(nxP$zi`gY}zwJSX}wS+=PM{5nn`hYY^30N#| z)br65S!xq{^i_>|I483WsB5UO~Pgf1$Ehuy!6@J64S%yH+ncskR=&)uQ+-77H+ZW>GMWDo#C6els!3C zFTQT&r&u=mFn6Fy^u23Ehi5a?{KB?o4ZdO3S_bc3Jv6k7ddb&$z(o4)hL)hxOrdFOK*9Q0hB z9VX$~YmVD+6azaT&p)SG+*O{>&f4}ZNXUc>FY_L%xrtjEH3h=RBQ50C7en`574G`2*V+hFStt-Oj za2-b^<3|(JGfW(55@Irj*@+X>^GO_Of_fSWA+gbk*@TG=5H?X_1B6YG*Z>(fF@kzL zjgw$SJ&VQ(2&{aWo5?)Q78B}ltTT1D zJ<1a%Sn9N{6Ps*Bhr0QgH?t0*@Rj=NL&x3g5DIm0(v^Do#oIPHghCyt%5i@8`A%aB zb|2OZU|+I2|uD%+)^?k$`%uTK)pOS?ocQE zc54Ya zXDV?otdzLRwH^I(i=+Zy5Q7(nArA=GgWLm|{b4g&~v zgu?(r9o`6#{a()|gzWRMi6HwtYy!wW&v=U;)YJ$eo>9{x1Rm6s2!RJR9U?r9b{bW; zWG0ogeqgaE%aY7wl>jyqE;8j{d+9ltdB0^6jzTOVJ{oQqZdZqkqIW8vE@ss&TqoDA zooE5q&Q>q)^@Pdt-z9C_z9zg`I$rCImL1%rY|%SuAUw*Q$9r&7^rI=@isPZ?7a$uq z`Z7mR-9j1dCg?Fxr8SCmRi2;JC|2|fv-R@_=W?7N<@szu`z7zxr3Hw(L+B!VHa9H~ zCxT}oT&E7jqseHPJAv!af>QsPN92cnDY205_ra$mc@er%Nh$wAo|iZK2J(C#d`Il- zQx!O?J16=o_ip?AzIbsUQ;c)_X?JnwL|zfSvm?s`k>!40BH)9+B+A)z^FZagxM^^3 zsm?KFjebg0jGHz9XR&Elc^`IQz#s5!OqB|1seAp2Yr8j;#O{-%f$l)KI}qv(`Fs8G z?u{E(bqD>y5b1};IT4LE;*>Y*CM7AK+?>s!9`!}c@m%fzQ}>^vcmK+Bi+vfslvC*% zu<2#z`g?EN_TXzTk`mhajqH&C*yt*8f>7+sqHet^R}R)baypTq zX%iItq!M})cuu?erSFjEH@d&Q`ufUX5%={;;FX#L9y}HKt;u8SHs4mhNK3-u7u6dI zDw|ypZ)hhE-%{(J)raT#p=3r-usfQz)5lL*c`JBdM>46qb<*2aeYR8-3#=Z#i1aWR z+H1Z|sXiUfh0KDrtCbzk%=TSf(noZt-*;6-fsdPHHK`TFs)2yp9uNQZlD|AU^gQ8! z-6J(CcU6votgM#c5VGLczGLL8^NI?p*G za>n`RZl z)DKPM3x!6TiqGIW&MNT*6%I*-8cp*_C{#;!JbLYWF9o|bsfg}p2>%Pg#8C1ACEw$^U(Zyd8prId$ip7pqGKGi3I zdQNSgJIce;5>!rDJbsYd$K;+fw8u*_*aeiB6vXH0(|RzA{Z>>vD75|@)03yclPgs+ zB#Gh2lk!tXpA=XgNGuPSN79N@fNj=%cL^XcxzG-C0JGM{-fcx_)6g|YSU zEQPC)E71XYR?Vl-W<6kJV<*HS?-@i0=qojoVKYR($QNCUQ>YHqd=xqYBny*IMv>t; zGj3fF{Tp3YAZu3=kY?ebw~Un91UG#$u+o#iPhzxLpVc8MHGP4VAf}eNsP5~q5fQ?! z$9T4#NZJLgqo!zM#(*1^R4ps5DQ4k$2pQS8)Se$vgCI$pOGSMDBF}~o`fCoaFh3m5 z@P|nS`MtufIk(n#F1XuE)^&YPqtC*Ga4S=vv|!&D4;z!X(47K6><0 z!qF@~ASs0mjOp0snM+8TDN-h37#}8%^J>q+Fq;TFfb9eDWm6>wRJpxOF2(_X+AoVd zEJ@)9WQu&^YG{k8{2jMbW11 z+wjy+YFAfuUWO)RpWEgF?__ zJH~kN@r{>YjHm4f8prvlLp;{`ls!E6@=>RJ<6S=|!>*P2hEx5VjJs~|IT?7};Bzwa zHoU5#E+2K;zO9DdntZfU%-eP>shwC!4(uV4f`OAB;sTzN4&nlylm6iXo|Eq30$!`$ z;WYDe*fm%urVjB~C#DYZoFt~O8M0=hK)GNGA8){R3E0+LNDn79I#|ts4<9?Y$?#o= zwxDI*aIB~6ZS|~%VcF( zA^om8r(+vVwvgCHlPx5+!DI_*9L5rM)`Sq=*jW<-5_Z;vfP|ej)gr-%>y2woecJc) z)z3{w+>Jw*fw$+ikKEe}u*0wUdabc3#Z4|aQ$c3RMVqR)rI$@E8oj4>pJ1++O)lno(bgqw7``q4 zvdP6fwX!N?K?zjoE?@CGY`bVn9&LYOE zhzqw}_upg{(KZ(C)EiC>FXpmoUfWCt#~{k^Nv?kE_awP;sQ%l+`>zr#aMhZ_Ia;b^ zySmeo>ySH7c?*FHU9ZP4xZ%$TT(l{EOT8}p1Bbyyo3ggz+P3V2G)6npriQJ!JdybY z3@+LftQFU`dD6)Smu;Is4=?V)+y6wu-ZmC?bb0>IWt;xjITm(sZ96Ws8KDu;V{6*8 zhah0lX3?!e-t%doUkR<)3GNI)S^wjT4E)phW;N> z>-xI$#+&33?03Cui=(gMprL0IHE8uG$Xim4T?U58HlB4uA1V=bLmw{@*{}!A+HUHs zFJOujcvF&LSq|kj?mZ!nwW)8?eaQkkD9UZAfS=vbGwuk*5)lycV?j{Sj@RR#^+y zu8(N3pT4W}u47*L${Fe!Qfq^22ic*)ZQ=SS2vf7tAfHuR!X0q&pH6<@oG+1+h`z`Y z4o*SR;U;J0vs#dmn*qSj*r_$W0%YZbjjqmh4=lL&PtSdfXmuk?v{@CjX^NClTkF-; z`IQ@PxZ$svsU*zcb1rtm&8W~#QB;x7S#3*qA|DP}SH*k_rmoIIK6~ZEe|_zHq&3|M zGz+^lx;l@2Wy{<}r%WeJNr^@~L!mLVhyS{CzmtwTo}f`SDIFTEnSpF~9dqHuq)92w zj1JBPGou5dGcySOrJtP@Ct6XO866t6nQ_0lrFi=4#|RyznbFa?VrFzmB=Rkq*;WYr zh4a$jhjJ~{q8517;a{<6WC-UjcPGB(?_Ts zUnGrsB~#_>_?b&>X<^HFSEn|dFa58TL@hYikYSurJDN78-*uis@Zfk7hIsZY&86_S z=xJwtg8R*ZXaA5W=2Ea(GBrKe7A~-DYILIk8QCq(H#3>clH)&R%r|LUm5z< zw7bs8(P&MLj?N8Jqaz}1Y01>IK5$F1hx7qjQ=?;}HZ?urp7J$3y7+AAMyxR)qKYie|KB-kvQ+K0cNW!uzfmjh@rH873;IQljFnHn7%X;q7vnpQ+x zaHmm4qct@;IyX#>j)=sWB~ybguw`x`O8Nk;snM}fo0@L9dkzphT2rILbH~)^m`M0r zG_{{S_0&_BZ~hTcjMmiX=t!_xGBvGed)2}F+BP-Xg+v-tI~`1I*@-6*Hrh;$j*Yac zMN`vrj~(_iH99&sOpT6+#F-^iD=!YjNFSgzH99tGQ`1%lZaD1>f=6p=ba?KV8XXe} ze~YH3CEhDWlCS~Rs= zi!i+ZkNr%Ij?N8Jqaz}5X35l`3w(mxJ4pHft*OzmQJb2cc0ZOUc(kTQhv$x|(J_(m zw`gj5-Rf`OCyKdb9F|P&e_;;w*d1NAO^tT*5~Znqu=3So2^(#uM#n~4)uO3s`S(-5 zJfE~G?evq5&J9zeBcd}ky%PTuA5n|e)acl#O-(P5Ui@i-M{89V6 ze!h}?kSIoLYIJlY*esbEEK1$}Z_gh@jDvDv*5=IE(mDZb{R-T$&X|2tZsnjuBW-HY z(mdKf?_5RNlyWNv9h@CUbEhFKWoi0Wj%N-hT3H;Ns(T;lw4(MiElIoS$^Rg9lv_FI z=YTL}{=II2@l9|B}5IN;g+h#_GMw--OW~SGKp1YDXDW#dw z!MR{&bU>sbEt#3VKVso3q7`LkMu$diX4%#04s1v8@qA~9yk%yi>l za2T-*N;9KFqc$^rnt72x=qSyMj?NV`qeCK*Z_&)Ot$O?X(spJ>2S)nPi+nRIB> zW~Nv4UMbl%Gdem~%#03+M7~8c(>9U+;qSIHGdegDXqL?E5jX&0&cJ-TH8a}ti)hRY zN-c}_w_h`(LnBRU(aiL0<6pKjGdegI%#03*#F!;B)3ex&@%<)cW=4lbZD!^DvuD~h zGdem~%#03+M7~8c(<-2m*KB8IbZ{ilESZ@$F1==|nQd=$*MN{!HYee~(!4Ar)ofm@ zyY+lFH&uYEkkX=eT45kB`U|vQn>N1>;liCyZa?bYED5)p+BhHAkxUkNAW3=`>L>e~ z8AU+@R3%xdT+7ZF5VoADTkm|Hj0j(EIz zA^f*C?oboq0SyTAqTJ^W9yzn^_BJkNqB_)|%H&E)e^M6DDv62;t?GIVKJ)kWfB42f z&n7VlhP=##x?prvx0?5iTVteoztP6cH_^>B=(2^3K)MdFzO-9gkl7-b(@QqBd?n6K9u+Q%V-3LFc5+cu3sAPVP~9LslAt zSi5Fk`zbS@e4NA@v}V|(($X50^sX={X8ZazGXdL^3W7_v_xeA+^>*<19}~{&+PEbq z&iMwXEb>A!HLQv*kwKi@y!>iFZ5kn_)-%j>^{^`H3Kk$=8y3r$0100j#;+J!3 zKB-x1YT5m2TUJTBH&%3YY97*k#@~r_qYt)&a#j_`p#MVe_1*h`@mc%V8r=h_=PNql zShViW-fwRuy#q>KhnQM-f1`CPd@eWdiaURwvq>}@I z5Vw$ z9od4S@&%Y@bo5J#>I;Rm6}ZWpq?ETTy*N~qWKalxb!JhPgi=~fX83$IH|(9X{wPiN z>aXhW+`c(m5Fk_9qx!4L=8|`hs!Z!o*a+h;`5UEDR`4$71tBcPf-zqjwzT`g5kBSP z(}|2P&WEC@NIDbY{XDmTn^paZ4K-_;CIRJ)^1L!>Y6WlE_&lcM>Ebr-U{mnzs|TN0 zP-Uq&oa6`iY>rRm#AGU~D)2L{YuA*Ha1@CFH{~cwQZA`vx46LZK^wQ&q{qCn9+ikl zrb^kIQ0|vAxv41`!l>Wde(X^zS971>W>*47fGI}hO5de1fe(a&fmk$<3Z>#gCY=z& ziKrOhqkb{Uhhx!9wkzs|gx*Wt+PDAe!r%$UE+f|_^f(%(fE z6)88c9HvW|3GhZGMefOEQ$27@ehFfp`i&kL^EUFBFH;eTADQdo$p&Kzj*H!~R)FYP)JrG$cpDl>8(36$4IIH}><$ZE7y?h|BJS}H? z6uuy&q#;r1@kb*Offr-ZbRZT;hxuq!^r!v7I3yL(SW1kgGLcLwBZgDqu$cCzcs>oe zM<^(W(P-2ki~D=Dx3D&JUqzDZU0N_c#`6g=kcg$@sX!pbi~gYC4+g|I=#vq`Q9k64 zgav;p81zR1u|zD$XJP?COsCVC1RvpdJHbNHa3bXwMIjSU#WR^mFa`k^|g#?F0)4B55HJh=zqoEE9~0sh}vNqp5hv zpAz8TgqV;Lu|OgniUs0ALf{jDv=E5!@!0O?VR1eXPVxQ-&&Ptn zU?l2~#{)1yOGGo_creZfBSF7E7K;ZV#KL}!-0U1i9#lk38n--6Y%qy2z1kQIG*t*cE3?F z6@kiEijRVW$HC`=kimuoJ_X4JWLr?7OUGlONG9VK{V+y`{ZLa8!~R%G@P~t`SRx(X z?IJ9l5F)$~j0S?Ch!FFq5*g@fd`yItKA09^9sub+{7xbvLP7>ZWGtSDrQk26gapqE z;h-(SzE3g1gSk*;J4akc ze+OsvwDJ5VbagcHCeXRZx_i{yZ0q_Nq5tM23i{o!ZZ<)LVqY(t7(%&FCz}{TxlkXQ z7(%&F7n>MDxlj+AAVRUPgG~&fT&RCd5TV%Dy(WfGF4VgwhEOilxrk8Tp_zbO2*;)cn&HHE~Yw!cN+v9r09f?dFBPf-$-o{&Et2&aXh z4+`mmFPwngrHNEH?hA&4LMkGNL0CEM8IW?NJnR_J{-#`jZ^)G8S?OHZaJmV;c@qWw z*L{333ma3@qN4cJ!J$~%O2w~cL*aOr3RYWSEk2y``TZfkFC2}=d?{FF@I`{*R6Lf* zz@kiwTB_op8p%yKsbCijti1!TH0q; z6IEXyyAD{Y>41HWJ76jr3&g}k*ay3@qCQxy3i=YTM~NyiUsQ*E zLRRh>$n$-$58DU({`m zn-TtO_S`4LP>Cr4|^r(sz z`f>x*Qa#=_%2x-KP51f&;rLq)tb9uIlPk#LENp*n{P~0nFJgB=aO7LSM+0N_7S5Ig z$M?I?Eeei&mmiE_yU;HSj^u}N)%B9#lv zgA+w67ZwO7ic~Hv5l$GX*e?=J6scTTCY&%*v0o^hC{nqwR5($ja$&J>q*VW5X<~-E z4pcuOv$!3;W|r~2G2phjU^qTSZ@pwVlKZEPPB$}cf!F^vnUO_tJXX9u`skzU68BtA zDqc^F_1oAld)2?n(_{TAPHSJ|{)K1@VD&Hb@L0D{z3H~z9N*Ph{}TT`sln-njr;%Z zS_<=+OJTdRUM|(cCW>Be6~!isUT#&!CW>BemB%KEUT)RMCX8M#70M=xUT#&(CX8M# zmCPoJUT)RRCW>Be70+t)f+Ez&)7~4;hsH+*?f!0Ko&HB-1@EMcHa_LISWBycZ&^!& zD%!X{to3gXJ$fcNoup+gjrH5uucg6$+L~8sSxaO6Do$%@7!xm8P>AbPo0OPe5ixmHV?AbPo0OPe5ixmHV?D0;b7 zOPe5ixmHV?D0;b7OPe5ixmHV?AbPo0OEc@$vX*AdhsH%M&0{{CwPh`h6}+Qbn)z(j zaU;LQTG}@A(X4Pb?#OGeOm$f80!gWm8y<3#G4J zUnCdmUAw+WE)=;YfJp2sTf4qUE|jw-fJp3XSi8PRE)=eIeUV(KRuPfzwmP92>k<_q z{_fZi+pJ3w>~SJ=>tYlkZndAQd;$D099260s!;luUVmP5bl&oQF6M7zw;(n0Ra))m zV*V=5YE!$w3(=OT;IqP-U~9FXi+Kyt7pg|KN~`@zrkB_aYgYQV&3o& zHKFu!p^`PB^m3u7HKFu!p}sYt^m3upHL>(!U-g<$dbv;ln^=0WuZ2x0y<8}ZO(?xw zsF01+tJQ`gQ+_|z#WEefWqsUGd|pf@ zyPh# zANb&it{snF``$|duEgzadg>m=QzNL4{oB&dUit7}+gDT{JKD?QK6UHdyN-Efi-$b* z^KINArnlW+f7`s6m*nB(%Cwpt5R-gCDpWqaMUFF%=gy5J&Ms(Fn6D*C>D6Lvh@|*W z0%1W8Z5t&cIo?egB$u{vhngDZfSQJZ7CE-mC$5D%g%n0C9#20}<>EH(U=z!}^(?G7 zpf38+Yx{jfIEEYSfvZHwkx~iF4_yngnsH$pcaTYfeasRZS;}NYnN>q9{mfHDfM$JU z6`$@G1#|_~?9uZGL-q~G zQyQXsv*AK}d{IlMvRm-8i0W)~1u=syqh}KuiJD8Xi~VMq<#7d`Y0|G{beHUFWOQH$ zBM*$KbVR~}?E>ZIxa-MTWTdhr3A@C9+Z!ME5SC_9dcry}o#V54KE-Sf^4^kXNR&3q z^iI(38P2U&`810GybV$s&#gm*q*>z2Xviu@$@QoGoIo^dnG?nQ$^kx`W7je(-dQ_` z1WB{$2%u;q7rKFz1^J`+CK04r?*$6o$k|}*>{+y*H8FBhdNYGeqGIXqZy{owH~L)U zq@&j-H){1c7TuN%Z=Y*)b9}{1wHf*b?eVN;sh(@jBT6*utZPL%T`aK)3Xi92fIu|s ztn1yi(Pj;^eq#Y9hnlA|(xm(0yNM<+4LZz}e0Q6Z@AdrlD2@k*dkL~>{ZhiFpMxl*m>D62*NqsQWL>`H3Wlvkx@a3L_(7}*m`(8 zlQE1uyva;NdpMh+i#)wqVzsd#4rJ48#tN!Y>y$j7gVA$Uf2n|<;t?sD`E-SVO=D;~ zR{sIhkspyfqFJj{P&O*;a)}>ZNqT0JT;fx5Ruxl_b#KTQ(LI8+N|WwE0N6eKD~-mf zf~XF{l4p_^1X)y+B(EshzCuzRE{fbtZmOX58qwP>s{MY%(nY1mcsj_6*_E_#(+S+9 zoP>x*8fynF(O!gT`T*z^&G_TNL?DgGR+Y^-dci4VDi{l;Gbw&tLkf{_G!_VE#y2FN zh{iI3(71*a!+ewv3T&JEi)~!DDFd3TXFye-N%OLth5Zujy~XGxmsb6C{K9FpmNzM$>FQ1vwn_EuqR-1!nc z9xLM@iO8+=OzyddA)RwfhF*e+$VHCMHAIF)+8caUqlI9Nf@#1Zl9~y7d@Y z3Rt!+M&|zWEt_EOnUEnO5-UR|LT6=YNBFGlJPDnZp)nb(UvJ|ACSy9Np)r+;V`xj) z*dk|BOFpU%m{_l6)XCDqouQI*`{0vPG$v$g&Q*AfsOGa z7x6}tF!va!Nn7DyZz@C?dqfP(!yyp^lWN93}F&Fu3A&<9_?9Im_k7|8i$tzD zg( z*V53iw+j?523CQ>#lR+TxbQ6kg^Phb5U$^~ar>L*8nbKX8ikAmM{Z54>^1k!_WOHl z{{wUL&Xf#iQKflBZS?#0JMLVrQ!2uFGAgf%-f8ghMFF;ZaMQVIvRKSz)4aFM4>r{y z!-jVyD;2zxWWTpMb1`NLZ_>SAus0m+<3^R9P+kj zN;$eC$!3|~cv7iI{j%>+y=<88jc0lzVlN-)&4|6JM9Y^Q&G&YpmyU_OVz@UF>P*eQDxAyUL{vWG_c+xSBxy+TnlMu`rlloVvFjl(Dez!pXdF4z3SR1kJV$j(Nv2-&#_8X+@N!6IbmEha)J zQH?y7i+GEuCL#Ms_Q?`V)!3yXWU3CQb6KTfWhR8>->kE+HxL9W2Bv{P#lR>qsPN4J zfr^145GvU0F?!D{%91LjrJP$wFc`J1tF2?PGK5O1*nJ6^mcaL-R<&a93?u7-_VE~4 z1STE>uRz7aHw{cY2JSHu5Blq3Q>}eoO|4xiisVckcFBYe)t$WAIL#3bnXv}tQg20p zW7Stt;8^uk95_b(6a|h|FRg)t)OPd(4^){iDB7w1XO+Z~n9PZVKDFPC?@z&X;tz-vZclhx?q$((j65h6lWhc0qz=UZ*ueA6^%W_g-30C)Cfwf#h{Q4=kwv1KojO_tKOkZHBW#^M|x}^?BU10oZ7zZK9e{`}g+ax+=c|VJwLHvpT9j zmh0lU>hHn}P6`AYC<6!ij5K~wrt%ko3||~;E9px~`U(TV$b)=9UPUQENNVS_5As4?0^WGB5H{qQG_WgB_SYlIj9)0Lwald#{QjG zTu)?biNs!|Qy;zOkxM4sK#-s?8#Sr4tX8GUq5tfaT7K#sdkj7Q4!Jth5*B;4ZoKE8 z!sVa*3&Co506&zk(-7%!cwe3$N|OIa+2(?f_Hp%&7!VRgPko!Q(yK){x+Z&umA2OK+&4k2rYEz;V?j{kU-CV;P3XEoA6{w!#IV4ixvyzdYZj3|1 zL|z%T1^P|zyjV9}MZ2z5i`--dp)RB8;3h)`rfv4eOt)VD>ScEYpP5010!L`L_i%6D z^5*(8)NX0g+cosGnXdxiW*@2!iq9k7J3IkFJka9e!S z3xT7qHJa2B8g8_ufA-zq-@5_{4L8_6e*HyPzVVBFa2gah)~*Wt?CZ}RjD!|N?V=mk zgqB?VgZVhB#Zb}O*Z<#lZu=9GYH?JwOaFW60mpyb7!Vxw&;%;lm#!@Q@QtO0YOo$Y zvb1buZE7|zD)2ofIsKna!shX8+gUvMaS~>Bsc>@+xSe&!t5Y8$R8aV#(uz@<23Y%k z;;zS>W1*|Hc;Pt+YT=HS5L)N=Y(r2Bccg?yocj42zwf*eaYso|>p!oUVRS^>A&)yk zf?7MDTl^qV%dS75ht}G&{>KO{yHx0rHNQ(V`k`Ga?7;Foxb}O_wZe@nxPkk@8{aT` zpCud*Q|GbI(Sjr=aATNbvtyy zBfpw^%0)X30OWbi*Arw?2Hv z*8+zbx;b(;+;F@3?4Rdfd)koA5gKl^U31OCd%tt|`6SvLq2UHw&zz6_`u)R@RKtz6 zcON?G!)4o%VhuML2EOPaTa@5V%8}dNS_|qm zgHB1Qaz;Vb^PzuTaQDy!1%i3IJ_LsV=yY}Rd@9rDc_eRsBq!99Ywp$sM-oV zD&(6xg0t>eAQMQG_yov|ES`l5+IjL1Z@#$wCIX7m@IqGEtnGNfjnQ*|F5Pp|&yX8l z$Y)XYyz2hK%h${>AaAi3owLo+*)9rv8kO?$Z!X)BK0QKO9VI-q-TzW?7!`ESdFzh- z)`N!-&~Mw5b04#u(=*wWENVA87eu=AD_d;?-7x#q6R*GfeKIPcjGmcnPRt0XsviIN z_CJ2Gte>a~ca++((7ny7avG;Htf*ohmG!IUx!ZTZ_nzq-oIr=#=`Y+E{?vJ| zZY88BIbH!dXC7L0!FzY!(Lo?lhI)WR_N2s?sVC2GL#Bl9+E5$(cL#oQ|Lu!56H551 z%1&s)0n^5OZjyu>lsis+>V@dyBM9T&_89j!GjyyfVY+_b8o7D?Ol6-d}j?o+YO!q~TFIlL|Fd{31#FW8tCp z|1tzciBVM?Qnl$Fs;FmY0hB3o zNhp@jS;9?J-hE)!szp1FBCII$rjqt;JZbPEo2Q19|k`f9t$he#?tS zEf-}nGK4%C+2hV<=6=7YkF+@o8j_GKzM_6tTtHrPriHU;S3hb;t!-2`i$V2-PFfdx4XB9u?kHv~lJs(}ey0Xwi zEXB$A#b8;dUb>s9;?9-ToO;BDmz#`gEVYPC7I8@1!kJ07J1z2(4PkQ9S6M93)VHRS z%mja3rk>7;dMuV>>iKS>iZ~ZmVd??ld@PDCQJX>GF20Z+;iQ)$FSQURC&RsVU?A!w zyS@!!%PmD4GF?xpM}xXA1)7V?DD|LDIx3UZlX^fW={lQ5lX^lY{T6wxg!Ee{se-ykEN~Yo zh^dqsoLyQOL*blrJ_Sb;BPQzPv5n4HuMUmLqt~rGCJJY6mlVuN0(th<1{a&vJ-dA3 zFpC!-Zg5Le&BNKfcT3HaWAxhVR(^_Ma}RL`nhvYnyL?z>Wj9I^9?eeh3mRR?MH+j< zs0C@D8<2D?H?*ioUXEzJE%H!GjUsLCWr?Y!< z)6T9vnMr5&EVJ0zD+H`MyXQ^9(eQ+R5`fyjh^mr=L<=*^t9Z_tGQj}v8+n7F<#;W^PCtbaf*2;j1vyb6JVThU>^5EIH3GC z^7@r(7E)ahWSNKPiP6qe5}V0n%oY;n0IVZ*w>`=eBv`Ywt`d{%L}$ABm^ZTynb39m z>O;rf>yQa^K+<)3`Ni8dIb^~doXR17_xVoa33H$-Gd=O9Q=KpeEuCrGrX6RJwzFjg z=;7oEU2e!^3kf}%R@_oDqRAE#dN92_H||g+^jPX1>~p9RW-e6ja#w$3Sh%f*!%T~G zq!sh?4pqX;kjh9apK+)XX0oIsy&OE%Q0ekExTV&(;cQ{0ZmoYLo78WONGfNQL~)Bd zB^>)WnCSq?DX*Ybhe^tW!86|U!5rH#^uZk1FnBOWH4GliA&v0Z@9s>($36~|=&_H( zBzWxOjI-FmOob3a88Zz+z`;y`5O6TlAA-|plTdXNWwP?+mMF`T%%b!U8vqxXQm(!9 zl*_zBG6}~I7V)0U&E(qE;iBlB%BPE2bqm+YwQEO9z-_JMBIPhy%=>&Bx37tAmQL4l zYh(vEDO>bT8qkbq@+K+eouU6dai}QCswi`Na8n?QEQ#Jp@(|bj1WM9KtVvXtNHn_{ zADXl#v98H)CpC%_?R*Vb_|Hi2&YU3S`D{V^CGXUw1&FvqsDgSnH!TlmZf7A@$JM4v1c z;Q~C~=l8{n1DRr++mD+ji#d^3MDOg#@<3#{-rY(Uy`dy_pCk=*2g2QfP;bcJ>yLMD+_0)U z=nsZS7c`D+Xta^0ytg(fN%`dFY!367Ez;l4;SMl${5g6Dt~|Ebm*Gn}m8$ug_i}9} zx^3Houf0erWDRd{C(qCGn^+*gePma^M7*!j#oL6S*q6oJHdU?;tbOEkB0y99i+xfF zTYt}KSHJWf^7uwqomXF887!i{Jc&8ANzB1Rao?FdwQjF$WpOA8hfq`x|F3LU!8>@3 zJbSp-HmlFh^Fzsupdj}!ZL6lAwD1P^L9WwAR*s=K{_scdg3?fwAd1ww~E%;$7?A*?#TGwvV zn%I_&#s(KJl;FHR1yZzRCYSB&M;}taaEwF-uxbv#ZHZOhB)A9H4$UZfCkOq0#6D = + Pool::new(256, PromWriteRequest::default); +} #[derive(Debug, Serialize, Deserialize, JsonSchema)] pub struct DatabaseQuery { @@ -91,14 +100,15 @@ pub async fn remote_write( .with_label_values(&[db.as_str()]) .start_timer(); - let request = decode_remote_write_request(body).await?; + let request = decode_remote_write_request_to_row_inserts(body).await?; + if let Some(physical_table) = params.physical_table { let mut new_query_ctx = query_ctx.as_ref().clone(); new_query_ctx.set_extension(PHYSICAL_TABLE_PARAM, physical_table); query_ctx = Arc::new(new_query_ctx); } - handler.write(request, query_ctx, true).await?; + handler.write_fast(request, query_ctx, true).await?; Ok((StatusCode::NO_CONTENT, ())) } @@ -136,6 +146,23 @@ pub async fn remote_read( handler.read(request, query_ctx).await } +async fn decode_remote_write_request_to_row_inserts(body: Body) -> Result { + let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); + let body = hyper::body::to_bytes(body) + .await + .context(error::HyperSnafu)?; + + let buf = Bytes::from(snappy_decompress(&body[..])?); + + let mut request = PROM_WRITE_REQUEST_POOL.pull(PromWriteRequest::default); + request + .merge(buf) + .context(error::DecodePromRemoteRequestSnafu)?; + let (requests, samples) = request.as_row_insert_requests(); + crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_NUM_SERIES.observe(samples as f64); + Ok(requests) +} + async fn decode_remote_write_request(body: Body) -> Result { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_DECODE_ELAPSED.start_timer(); let body = hyper::body::to_bytes(body) diff --git a/src/servers/src/lib.rs b/src/servers/src/lib.rs index 4d04661c5eb..efa6084eae8 100644 --- a/src/servers/src/lib.rs +++ b/src/servers/src/lib.rs @@ -36,9 +36,13 @@ pub mod mysql; pub mod opentsdb; pub mod otlp; pub mod postgres; +mod prom_row_builder; pub mod prom_store; pub mod prometheus_handler; +pub mod proto; pub mod query_handler; +#[allow(clippy::all)] +mod repeated_field; mod row_writer; pub mod server; mod shutdown; diff --git a/src/servers/src/prom_row_builder.rs b/src/servers/src/prom_row_builder.rs new file mode 100644 index 00000000000..20a049f472b --- /dev/null +++ b/src/servers/src/prom_row_builder.rs @@ -0,0 +1,272 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::hash_map::Entry; +use std::collections::HashMap; +use std::string::ToString; + +use api::prom_store::remote::Sample; +use api::v1::value::ValueData; +use api::v1::{ + ColumnDataType, ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, SemanticType, + Value, +}; +use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE}; + +use crate::proto::PromLabel; +use crate::repeated_field::Clear; + +/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests]. +#[derive(Default)] +pub(crate) struct TablesBuilder { + tables: HashMap, +} + +impl Clear for TablesBuilder { + fn clear(&mut self) { + self.tables.clear(); + } +} + +impl TablesBuilder { + /// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist. + pub(crate) fn get_or_create_table_builder( + &mut self, + table_name: String, + label_num: usize, + row_num: usize, + ) -> &mut TableBuilder { + self.tables + .entry(table_name) + .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num)) + } + + /// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states. + pub(crate) fn as_insert_requests(&mut self) -> (RowInsertRequests, usize) { + let mut total_rows = 0; + let inserts = self + .tables + .drain() + .map(|(name, mut table)| { + total_rows += table.num_rows(); + table.as_row_insert_request(name) + }) + .collect(); + (RowInsertRequests { inserts }, total_rows) + } +} + +/// Builder for one table. +pub(crate) struct TableBuilder { + /// Column schemas. + schema: Vec, + /// Rows written. + rows: Vec, + /// Indices of columns inside `schema`. + col_indexes: HashMap, +} + +impl Default for TableBuilder { + fn default() -> Self { + Self::with_capacity(2, 0) + } +} + +impl TableBuilder { + pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self { + let mut col_indexes = HashMap::with_capacity(cols); + col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0); + col_indexes.insert(GREPTIME_VALUE.to_string(), 1); + + let mut schema = Vec::with_capacity(cols); + schema.push(ColumnSchema { + column_name: GREPTIME_TIMESTAMP.to_string(), + datatype: ColumnDataType::TimestampMillisecond as i32, + semantic_type: SemanticType::Timestamp as i32, + datatype_extension: None, + }); + + schema.push(ColumnSchema { + column_name: GREPTIME_VALUE.to_string(), + datatype: ColumnDataType::Float64 as i32, + semantic_type: SemanticType::Field as i32, + datatype_extension: None, + }); + + Self { + schema, + rows: Vec::with_capacity(rows), + col_indexes, + } + } + + /// Total number of rows inside table builder. + fn num_rows(&self) -> usize { + self.rows.len() + } + + /// Adds a set of labels and samples to table builder. + pub(crate) fn add_labels_and_samples(&mut self, labels: &[PromLabel], samples: &[Sample]) { + let mut row = vec![Value { value_data: None }; self.col_indexes.len()]; + + for PromLabel { name, value } in labels { + // safety: we expect all labels are UTF-8 encoded strings. + let tag_name = unsafe { String::from_utf8_unchecked(name.to_vec()) }; + let tag_value = unsafe { String::from_utf8_unchecked(value.to_vec()) }; + let tag_value = Some(ValueData::StringValue(tag_value)); + let tag_num = self.col_indexes.len(); + + match self.col_indexes.entry(tag_name) { + Entry::Occupied(e) => { + row[*e.get()].value_data = tag_value; + } + Entry::Vacant(e) => { + let column_name = e.key().clone(); + e.insert(tag_num); + self.schema.push(ColumnSchema { + column_name, + datatype: ColumnDataType::String as i32, + semantic_type: SemanticType::Tag as i32, + datatype_extension: None, + }); + row.push(Value { + value_data: tag_value, + }); + } + } + } + + if samples.len() == 1 { + let sample = &samples[0]; + row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp)); + row[1].value_data = Some(ValueData::F64Value(sample.value)); + self.rows.push(Row { values: row }); + return; + } + for sample in samples { + row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp)); + row[1].value_data = Some(ValueData::F64Value(sample.value)); + self.rows.push(Row { + values: row.clone(), + }); + } + } + + /// Converts [TableBuilder] to [RowInsertRequest] and clears buffered data. + pub(crate) fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest { + let mut rows = std::mem::take(&mut self.rows); + let schema = std::mem::take(&mut self.schema); + let col_num = schema.len(); + for row in &mut rows { + if row.values.len() < col_num { + row.values.resize(col_num, Value { value_data: None }); + } + } + + RowInsertRequest { + table_name, + rows: Some(Rows { schema, rows }), + } + } +} + +#[cfg(test)] +mod tests { + use api::prom_store::remote::Sample; + use api::v1::value::ValueData; + use api::v1::Value; + use bytes::Bytes; + + use crate::prom_row_builder::TableBuilder; + use crate::proto::PromLabel; + #[test] + fn test_table_builder() { + let mut builder = TableBuilder::default(); + builder.add_labels_and_samples( + &[ + PromLabel { + name: Bytes::from("tag0"), + value: Bytes::from("v0"), + }, + PromLabel { + name: Bytes::from("tag1"), + value: Bytes::from("v1"), + }, + ], + &[Sample { + value: 0.0, + timestamp: 0, + }], + ); + + builder.add_labels_and_samples( + &[ + PromLabel { + name: Bytes::from("tag0"), + value: Bytes::from("v0"), + }, + PromLabel { + name: Bytes::from("tag2"), + value: Bytes::from("v2"), + }, + ], + &[Sample { + value: 0.1, + timestamp: 1, + }], + ); + + let request = builder.as_row_insert_request("test".to_string()); + let rows = request.rows.unwrap().rows; + assert_eq!(2, rows.len()); + + assert_eq!( + vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(0)) + }, + Value { + value_data: Some(ValueData::F64Value(0.0)) + }, + Value { + value_data: Some(ValueData::StringValue("v0".to_string())) + }, + Value { + value_data: Some(ValueData::StringValue("v1".to_string())) + }, + Value { value_data: None }, + ], + rows[0].values + ); + + assert_eq!( + vec![ + Value { + value_data: Some(ValueData::TimestampMillisecondValue(1)) + }, + Value { + value_data: Some(ValueData::F64Value(0.1)) + }, + Value { + value_data: Some(ValueData::StringValue("v0".to_string())) + }, + Value { value_data: None }, + Value { + value_data: Some(ValueData::StringValue("v2".to_string())) + }, + ], + rows[1].values + ); + } +} diff --git a/src/servers/src/prom_store.rs b/src/servers/src/prom_store.rs index f86d30781c8..7553d979122 100644 --- a/src/servers/src/prom_store.rs +++ b/src/servers/src/prom_store.rs @@ -39,6 +39,8 @@ use crate::row_writer::{self, MultiTableData}; pub const METRIC_NAME_LABEL: &str = "__name__"; +pub const METRIC_NAME_LABEL_BYTES: &[u8] = b"__name__"; + /// Metrics for push gateway protocol pub struct Metrics { pub exposition: MetricsExposition, @@ -300,12 +302,12 @@ fn recordbatch_to_timeseries(table: &str, recordbatch: RecordBatch) -> Result Result<(RowInsertRequests, usize)> { +pub fn to_grpc_row_insert_requests(request: &WriteRequest) -> Result<(RowInsertRequests, usize)> { let _timer = crate::metrics::METRIC_HTTP_PROM_STORE_CONVERT_ELAPSED.start_timer(); let mut multi_table_data = MultiTableData::new(); - for series in request.timeseries { + for series in &request.timeseries { let table_name = &series .labels .iter() @@ -329,11 +331,11 @@ pub fn to_grpc_row_insert_requests(request: WriteRequest) -> Result<(RowInsertRe ); // labels - let kvs = series.labels.into_iter().filter_map(|label| { + let kvs = series.labels.iter().filter_map(|label| { if label.name == METRIC_NAME_LABEL { None } else { - Some((label.name, label.value)) + Some((label.name.clone(), label.value.clone())) } }); @@ -649,7 +651,7 @@ mod tests { ..Default::default() }; - let mut exprs = to_grpc_row_insert_requests(write_request) + let mut exprs = to_grpc_row_insert_requests(&write_request) .unwrap() .0 .inserts; diff --git a/src/servers/src/proto.rs b/src/servers/src/proto.rs new file mode 100644 index 00000000000..1a96cd9ed8b --- /dev/null +++ b/src/servers/src/proto.rs @@ -0,0 +1,304 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::ops::Deref; + +use api::prom_store::remote::Sample; +use api::v1::RowInsertRequests; +use bytes::{Buf, Bytes}; +use prost::encoding::message::merge; +use prost::encoding::{decode_key, decode_varint, DecodeContext, WireType}; +use prost::DecodeError; + +use crate::prom_row_builder::TablesBuilder; +use crate::prom_store::METRIC_NAME_LABEL_BYTES; +use crate::repeated_field::{Clear, RepeatedField}; + +impl Clear for Sample { + fn clear(&mut self) {} +} + +#[derive(Default, Clone)] +pub struct PromLabel { + pub name: Bytes, + pub value: Bytes, +} + +impl Clear for PromLabel { + fn clear(&mut self) { + self.name.clear(); + self.value.clear(); + } +} + +impl PromLabel { + pub fn merge_field( + &mut self, + tag: u32, + wire_type: WireType, + buf: &mut B, + ctx: DecodeContext, + ) -> Result<(), DecodeError> + where + B: Buf, + { + const STRUCT_NAME: &str = "PromLabel"; + match tag { + 1u32 => { + // decode label name + let value = &mut self.name; + prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| { + error.push(STRUCT_NAME, "name"); + error + }) + } + 2u32 => { + // decode label value + let value = &mut self.value; + prost::encoding::bytes::merge(wire_type, value, buf, ctx).map_err(|mut error| { + error.push(STRUCT_NAME, "value"); + error + }) + } + _ => prost::encoding::skip_field(wire_type, tag, buf, ctx), + } + } +} + +#[derive(Default)] +pub struct PromTimeSeries { + pub table_name: String, + pub labels: RepeatedField, + pub samples: RepeatedField, +} + +impl Clear for PromTimeSeries { + fn clear(&mut self) { + self.table_name.clear(); + self.labels.clear(); + self.samples.clear(); + } +} + +impl PromTimeSeries { + pub fn merge_field( + &mut self, + tag: u32, + wire_type: WireType, + buf: &mut B, + ctx: DecodeContext, + ) -> Result<(), DecodeError> + where + B: Buf, + { + const STRUCT_NAME: &str = "PromTimeSeries"; + match tag { + 1u32 => { + // decode labels + let label = self.labels.push_default(); + + let len = decode_varint(buf).map_err(|mut error| { + error.push(STRUCT_NAME, "labels"); + error + })?; + let remaining = buf.remaining(); + if len > remaining as u64 { + return Err(DecodeError::new("buffer underflow")); + } + + let limit = remaining - len as usize; + while buf.remaining() > limit { + let (tag, wire_type) = decode_key(buf)?; + label.merge_field(tag, wire_type, buf, ctx.clone())?; + } + if buf.remaining() != limit { + return Err(DecodeError::new("delimited length exceeded")); + } + if label.name.deref() == METRIC_NAME_LABEL_BYTES { + // safety: we expect all labels are UTF-8 encoded strings. + let table_name = unsafe { String::from_utf8_unchecked(label.value.to_vec()) }; + self.table_name = table_name; + self.labels.truncate(self.labels.len() - 1); // remove last label + } + Ok(()) + } + 2u32 => { + let sample = self.samples.push_default(); + merge(WireType::LengthDelimited, sample, buf, ctx).map_err(|mut error| { + error.push(STRUCT_NAME, "samples"); + error + })?; + Ok(()) + } + // skip exemplars + 3u32 => prost::encoding::skip_field(wire_type, tag, buf, ctx), + _ => prost::encoding::skip_field(wire_type, tag, buf, ctx), + } + } + + fn add_to_table_data(&mut self, table_builders: &mut TablesBuilder) { + let label_num = self.labels.len(); + let row_num = self.samples.len(); + let table_data = table_builders.get_or_create_table_builder( + std::mem::take(&mut self.table_name), + label_num, + row_num, + ); + table_data.add_labels_and_samples(self.labels.as_slice(), self.samples.as_slice()); + self.labels.clear(); + self.samples.clear(); + } +} + +#[derive(Default)] +pub struct PromWriteRequest { + table_data: TablesBuilder, + series: PromTimeSeries, +} + +impl Clear for PromWriteRequest { + fn clear(&mut self) { + self.table_data.clear(); + } +} + +impl PromWriteRequest { + pub fn as_row_insert_requests(&mut self) -> (RowInsertRequests, usize) { + self.table_data.as_insert_requests() + } + + pub fn merge(&mut self, mut buf: B) -> Result<(), DecodeError> + where + B: Buf, + Self: Sized, + { + const STRUCT_NAME: &str = "PromWriteRequest"; + let ctx = DecodeContext::default(); + while buf.has_remaining() { + let (tag, wire_type) = decode_key(&mut buf)?; + assert_eq!(WireType::LengthDelimited, wire_type); + match tag { + 1u32 => { + // decode TimeSeries + let len = decode_varint(&mut buf).map_err(|mut e| { + e.push(STRUCT_NAME, "timeseries"); + e + })?; + let remaining = buf.remaining(); + if len > remaining as u64 { + return Err(DecodeError::new("buffer underflow")); + } + + let limit = remaining - len as usize; + while buf.remaining() > limit { + let (tag, wire_type) = decode_key(&mut buf)?; + self.series + .merge_field(tag, wire_type, &mut buf, ctx.clone())?; + } + if buf.remaining() != limit { + return Err(DecodeError::new("delimited length exceeded")); + } + self.series.add_to_table_data(&mut self.table_data); + } + 3u32 => { + // we can ignore metadata for now. + prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?; + } + _ => prost::encoding::skip_field(wire_type, tag, &mut buf, ctx.clone())?, + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::collections::{HashMap, HashSet}; + + use api::prom_store::remote::WriteRequest; + use api::v1::RowInsertRequests; + use bytes::Bytes; + use prost::Message; + + use crate::prom_store::to_grpc_row_insert_requests; + use crate::proto::PromWriteRequest; + use crate::repeated_field::Clear; + + fn check_deserialized( + prom_write_request: &mut PromWriteRequest, + data: &Bytes, + expected_samples: usize, + expected_rows: &RowInsertRequests, + ) { + prom_write_request.clear(); + prom_write_request.merge(data.clone()).unwrap(); + let (prom_rows, samples) = prom_write_request.as_row_insert_requests(); + + assert_eq!(expected_samples, samples); + assert_eq!(expected_rows.inserts.len(), prom_rows.inserts.len()); + + let schemas = expected_rows + .inserts + .iter() + .map(|r| { + ( + r.table_name.clone(), + r.rows + .as_ref() + .unwrap() + .schema + .iter() + .map(|c| (c.column_name.clone(), c.datatype, c.semantic_type)) + .collect::>(), + ) + }) + .collect::>(); + + for r in &prom_rows.inserts { + let expected = schemas.get(&r.table_name).unwrap(); + assert_eq!( + expected, + &r.rows + .as_ref() + .unwrap() + .schema + .iter() + .map(|c| { (c.column_name.clone(), c.datatype, c.semantic_type) }) + .collect() + ); + } + } + + // Ensures `WriteRequest` and `PromWriteRequest` produce the same gRPC request. + #[test] + fn test_decode_write_request() { + let mut d = std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR")); + d.push("benches"); + d.push("write_request.pb.data"); + let data = Bytes::from(std::fs::read(d).unwrap()); + + let (expected_rows, expected_samples) = + to_grpc_row_insert_requests(&WriteRequest::decode(data.clone()).unwrap()).unwrap(); + + let mut prom_write_request = PromWriteRequest::default(); + for _ in 0..3 { + check_deserialized( + &mut prom_write_request, + &data, + expected_samples, + &expected_rows, + ); + } + } +} diff --git a/src/servers/src/query_handler.rs b/src/servers/src/query_handler.rs index d36b7418b02..347ec524565 100644 --- a/src/servers/src/query_handler.rs +++ b/src/servers/src/query_handler.rs @@ -29,6 +29,7 @@ use std::collections::HashMap; use std::sync::Arc; use api::prom_store::remote::{ReadRequest, WriteRequest}; +use api::v1::RowInsertRequests; use async_trait::async_trait; use common_query::Output; use opentelemetry_proto::tonic::collector::metrics::v1::{ @@ -95,6 +96,15 @@ pub trait PromStoreProtocolHandler { ctx: QueryContextRef, with_metric_engine: bool, ) -> Result<()>; + + /// Handling prometheus remote write requests + async fn write_fast( + &self, + request: RowInsertRequests, + ctx: QueryContextRef, + with_metric_engine: bool, + ) -> Result<()>; + /// Handling prometheus remote read requests async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result; /// Handling push gateway requests diff --git a/src/servers/src/repeated_field.rs b/src/servers/src/repeated_field.rs new file mode 100644 index 00000000000..0e3baf16a57 --- /dev/null +++ b/src/servers/src/repeated_field.rs @@ -0,0 +1,540 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Copyright (c) 2019 Stepan Koltsov +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in all +// copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +// EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +// MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. +// IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, +// DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR +// OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE +// OR OTHER DEALINGS IN THE SOFTWARE. + +/// ! The [Clear] trait and [RepeatedField] are taken from [rust-protobuf](https://github.com/stepancheg/rust-protobuf/tree/master/protobuf-examples/vs-prost) +/// to leverage the pooling mechanism to avoid frequent heap allocation/deallocation when decoding deeply nested structs. +use std::borrow::Borrow; +use std::cmp::Ordering; +use std::default::Default; +use std::hash::{Hash, Hasher}; +use std::iter::{FromIterator, IntoIterator}; +use std::ops::{Deref, DerefMut, Index, IndexMut}; +use std::{fmt, slice, vec}; + +use bytes::Bytes; + +/// anything that can be cleared +pub trait Clear { + /// Clear this make, make it equivalent to newly created object. + fn clear(&mut self); +} + +impl Clear for Option { + fn clear(&mut self) { + self.take(); + } +} + +impl Clear for String { + fn clear(&mut self) { + String::clear(self); + } +} + +impl Clear for Vec { + fn clear(&mut self) { + Vec::clear(self); + } +} + +impl Clear for Bytes { + fn clear(&mut self) { + Bytes::clear(self); + } +} + +/// Wrapper around vector to avoid deallocations on clear. +pub struct RepeatedField { + vec: Vec, + len: usize, +} + +impl RepeatedField { + /// Return number of elements in this container. + #[inline] + pub fn len(&self) -> usize { + self.len + } + + /// Clear. + #[inline] + pub fn clear(&mut self) { + self.len = 0; + } +} + +impl Default for RepeatedField { + #[inline] + fn default() -> RepeatedField { + RepeatedField { + vec: Vec::new(), + len: 0, + } + } +} + +impl RepeatedField { + /// Create new empty container. + #[inline] + pub fn new() -> RepeatedField { + Default::default() + } + + /// Create a contained with data from given vec. + #[inline] + pub fn from_vec(vec: Vec) -> RepeatedField { + let len = vec.len(); + RepeatedField { vec, len } + } + + /// Convert data into vec. + #[inline] + pub fn into_vec(self) -> Vec { + let mut vec = self.vec; + vec.truncate(self.len); + vec + } + + /// Return current capacity. + #[inline] + pub fn capacity(&self) -> usize { + self.vec.capacity() + } + + /// View data as slice. + #[inline] + pub fn as_slice<'a>(&'a self) -> &'a [T] { + &self.vec[..self.len] + } + + /// View data as mutable slice. + #[inline] + pub fn as_mut_slice<'a>(&'a mut self) -> &'a mut [T] { + &mut self.vec[..self.len] + } + + /// Get subslice of this container. + #[inline] + pub fn slice(&self, start: usize, end: usize) -> &[T] { + &self.as_ref()[start..end] + } + + /// Get mutable subslice of this container. + #[inline] + pub fn slice_mut(&mut self, start: usize, end: usize) -> &mut [T] { + &mut self.as_mut_slice()[start..end] + } + + /// Get slice from given index. + #[inline] + pub fn slice_from(&self, start: usize) -> &[T] { + &self.as_ref()[start..] + } + + /// Get mutable slice from given index. + #[inline] + pub fn slice_from_mut(&mut self, start: usize) -> &mut [T] { + &mut self.as_mut_slice()[start..] + } + + /// Get slice to given index. + #[inline] + pub fn slice_to(&self, end: usize) -> &[T] { + &self.as_ref()[..end] + } + + /// Get mutable slice to given index. + #[inline] + pub fn slice_to_mut(&mut self, end: usize) -> &mut [T] { + &mut self.as_mut_slice()[..end] + } + + /// View this container as two slices split at given index. + #[inline] + pub fn split_at<'a>(&'a self, mid: usize) -> (&'a [T], &'a [T]) { + self.as_ref().split_at(mid) + } + + /// View this container as two mutable slices split at given index. + #[inline] + pub fn split_at_mut<'a>(&'a mut self, mid: usize) -> (&'a mut [T], &'a mut [T]) { + self.as_mut_slice().split_at_mut(mid) + } + + /// View all but first elements of this container. + #[inline] + pub fn tail(&self) -> &[T] { + &self.as_ref()[1..] + } + + /// Last element of this container. + #[inline] + pub fn last(&self) -> Option<&T> { + self.as_ref().last() + } + + /// Mutable last element of this container. + #[inline] + pub fn last_mut<'a>(&'a mut self) -> Option<&'a mut T> { + self.as_mut_slice().last_mut() + } + + /// View all but last elements of this container. + #[inline] + pub fn init<'a>(&'a self) -> &'a [T] { + let s = self.as_ref(); + &s[0..s.len() - 1] + } + + /// Push an element to the end. + #[inline] + pub fn push(&mut self, value: T) { + if self.len == self.vec.len() { + self.vec.push(value); + } else { + self.vec[self.len] = value; + } + self.len += 1; + } + + /// Pop last element. + #[inline] + pub fn pop(&mut self) -> Option { + if self.len == 0 { + None + } else { + self.vec.truncate(self.len); + self.len -= 1; + self.vec.pop() + } + } + + /// Insert an element at specified position. + #[inline] + pub fn insert(&mut self, index: usize, value: T) { + assert!(index <= self.len); + self.vec.insert(index, value); + self.len += 1; + } + + /// Remove an element from specified position. + #[inline] + pub fn remove(&mut self, index: usize) -> T { + assert!(index < self.len); + self.len -= 1; + self.vec.remove(index) + } + + /// Retains only the elements specified by the predicate. + /// + /// In other words, remove all elements `e` such that `f(&e)` returns `false`. + /// This method operates in place, visiting each element exactly once in the + /// original order, and preserves the order of the retained elements. + /// + /// # Examples + /// + /// ``` + /// # use protobuf::RepeatedField; + /// + /// let mut vec = RepeatedField::from(vec![1, 2, 3, 4]); + /// vec.retain(|&x| x % 2 == 0); + /// assert_eq!(vec, RepeatedField::from(vec![2, 4])); + /// ``` + pub fn retain(&mut self, f: F) + where + F: FnMut(&T) -> bool, + { + // suboptimal + self.vec.truncate(self.len); + self.vec.retain(f); + self.len = self.vec.len(); + } + + /// Truncate at specified length. + #[inline] + pub fn truncate(&mut self, len: usize) { + if self.len > len { + self.len = len; + } + } + + /// Reverse in place. + #[inline] + pub fn reverse(&mut self) { + self.as_mut_slice().reverse() + } + + /// Into owned iterator. + #[inline] + pub fn into_iter(mut self) -> vec::IntoIter { + self.vec.truncate(self.len); + self.vec.into_iter() + } + + /// Immutable data iterator. + #[inline] + pub fn iter<'a>(&'a self) -> slice::Iter<'a, T> { + self.as_ref().iter() + } + + /// Mutable data iterator. + #[inline] + pub fn iter_mut<'a>(&'a mut self) -> slice::IterMut<'a, T> { + self.as_mut_slice().iter_mut() + } + + /// Sort elements with given comparator. + #[inline] + pub fn sort_by(&mut self, compare: F) + where + F: Fn(&T, &T) -> Ordering, + { + self.as_mut_slice().sort_by(compare) + } + + /// Get data as raw pointer. + #[inline] + pub fn as_ptr(&self) -> *const T { + self.vec.as_ptr() + } + + /// Get data a mutable raw pointer. + #[inline] + pub fn as_mut_ptr(&mut self) -> *mut T { + self.vec.as_mut_ptr() + } +} + +impl RepeatedField { + /// Push default value. + /// This operation could be faster than `rf.push(Default::default())`, + /// because it may reuse previously allocated and cleared element. + pub fn push_default<'a>(&'a mut self) -> &'a mut T { + if self.len == self.vec.len() { + self.vec.push(Default::default()); + } else { + self.vec[self.len].clear(); + } + self.len += 1; + self.last_mut().unwrap() + } +} + +impl From> for RepeatedField { + #[inline] + fn from(values: Vec) -> RepeatedField { + RepeatedField::from_vec(values) + } +} + +impl<'a, T: Clone> From<&'a [T]> for RepeatedField { + #[inline] + fn from(values: &'a [T]) -> RepeatedField { + RepeatedField::from_slice(values) + } +} + +impl Into> for RepeatedField { + #[inline] + fn into(self) -> Vec { + self.into_vec() + } +} + +impl RepeatedField { + /// Copy slice data to `RepeatedField` + #[inline] + pub fn from_slice(values: &[T]) -> RepeatedField { + RepeatedField::from_vec(values.to_vec()) + } + + /// Copy slice data to `RepeatedField` + #[inline] + pub fn from_ref>(values: X) -> RepeatedField { + RepeatedField::from_slice(values.as_ref()) + } + + /// Copy this data into new vec. + #[inline] + pub fn to_vec(&self) -> Vec { + self.as_ref().to_vec() + } +} + +impl Clone for RepeatedField { + #[inline] + fn clone(&self) -> RepeatedField { + RepeatedField { + vec: self.to_vec(), + len: self.len(), + } + } +} + +impl FromIterator for RepeatedField { + #[inline] + fn from_iter>(iter: I) -> RepeatedField { + RepeatedField::from_vec(FromIterator::from_iter(iter)) + } +} + +impl<'a, T> IntoIterator for &'a RepeatedField { + type Item = &'a T; + type IntoIter = slice::Iter<'a, T>; + + fn into_iter(self) -> slice::Iter<'a, T> { + self.iter() + } +} + +impl<'a, T> IntoIterator for &'a mut RepeatedField { + type Item = &'a mut T; + type IntoIter = slice::IterMut<'a, T>; + + fn into_iter(self) -> slice::IterMut<'a, T> { + self.iter_mut() + } +} + +impl<'a, T> IntoIterator for RepeatedField { + type Item = T; + type IntoIter = vec::IntoIter; + + fn into_iter(self) -> vec::IntoIter { + self.into_iter() + } +} + +impl PartialEq for RepeatedField { + #[inline] + fn eq(&self, other: &RepeatedField) -> bool { + self.as_ref() == other.as_ref() + } +} + +impl Eq for RepeatedField {} + +impl PartialEq<[T]> for RepeatedField { + fn eq(&self, other: &[T]) -> bool { + self.as_slice() == other + } +} + +impl PartialEq> for [T] { + fn eq(&self, other: &RepeatedField) -> bool { + self == other.as_slice() + } +} + +impl RepeatedField { + /// True iff this container contains given element. + #[inline] + pub fn contains(&self, value: &T) -> bool { + self.as_ref().contains(value) + } +} + +impl Hash for RepeatedField { + fn hash(&self, state: &mut H) { + self.as_ref().hash(state); + } +} + +impl AsRef<[T]> for RepeatedField { + #[inline] + fn as_ref<'a>(&'a self) -> &'a [T] { + &self.vec[..self.len] + } +} + +impl Borrow<[T]> for RepeatedField { + #[inline] + fn borrow(&self) -> &[T] { + &self.vec[..self.len] + } +} + +impl Deref for RepeatedField { + type Target = [T]; + #[inline] + fn deref(&self) -> &[T] { + &self.vec[..self.len] + } +} + +impl DerefMut for RepeatedField { + #[inline] + fn deref_mut(&mut self) -> &mut [T] { + &mut self.vec[..self.len] + } +} + +impl Index for RepeatedField { + type Output = T; + + #[inline] + fn index<'a>(&'a self, index: usize) -> &'a T { + &self.as_ref()[index] + } +} + +impl IndexMut for RepeatedField { + #[inline] + fn index_mut<'a>(&'a mut self, index: usize) -> &'a mut T { + &mut self.as_mut_slice()[index] + } +} + +impl Extend for RepeatedField { + fn extend>(&mut self, iter: I) { + self.vec.truncate(self.len); + self.vec.extend(iter); + self.len = self.vec.len(); + } +} + +impl<'a, T: Copy + 'a> Extend<&'a T> for RepeatedField { + fn extend>(&mut self, iter: I) { + self.vec.truncate(self.len); + self.vec.extend(iter); + self.len = self.vec.len(); + } +} + +impl fmt::Debug for RepeatedField { + #[inline] + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + self.as_ref().fmt(f) + } +} diff --git a/src/servers/tests/http/prom_store_test.rs b/src/servers/tests/http/prom_store_test.rs index 44564c367b4..f77c7d7cb27 100644 --- a/src/servers/tests/http/prom_store_test.rs +++ b/src/servers/tests/http/prom_store_test.rs @@ -18,6 +18,7 @@ use api::prom_store::remote::{ LabelMatcher, Query, QueryResult, ReadRequest, ReadResponse, WriteRequest, }; use api::v1::greptime_request::Request; +use api::v1::RowInsertRequests; use async_trait::async_trait; use axum::Router; use axum_test_helper::TestClient; @@ -64,6 +65,16 @@ impl PromStoreProtocolHandler for DummyInstance { Ok(()) } + + async fn write_fast( + &self, + _request: RowInsertRequests, + _ctx: QueryContextRef, + _with_metric_engine: bool, + ) -> Result<()> { + Ok(()) + } + async fn read(&self, request: ReadRequest, ctx: QueryContextRef) -> Result { let _ = self .tx @@ -141,6 +152,7 @@ fn make_test_app(tx: mpsc::Sender<(String, Vec)>) -> Router { #[tokio::test] async fn test_prometheus_remote_write_read() { + common_telemetry::init_default_ut_logging(); let (tx, mut rx) = mpsc::channel(100); let app = make_test_app(tx); @@ -219,28 +231,17 @@ async fn test_prometheus_remote_write_read() { requests.push(s); } - assert_eq!(4, requests.len()); + assert_eq!(2, requests.len()); - assert_eq!("public", requests[0].0); - assert_eq!("prometheus", requests[1].0); - assert_eq!("prometheus", requests[2].0); - assert_eq!("public", requests[3].0); - - assert_eq!( - write_request, - WriteRequest::decode(&(requests[0].1)[..]).unwrap() - ); - assert_eq!( - write_request, - WriteRequest::decode(&(requests[1].1)[..]).unwrap() - ); + assert_eq!("prometheus", requests[0].0); + assert_eq!("public", requests[1].0); assert_eq!( read_request, - ReadRequest::decode(&(requests[2].1)[..]).unwrap() + ReadRequest::decode(&(requests[0].1)[..]).unwrap() ); assert_eq!( read_request, - ReadRequest::decode(&(requests[3].1)[..]).unwrap() + ReadRequest::decode(&(requests[1].1)[..]).unwrap() ); }