diff --git a/Cargo.toml b/Cargo.toml index aef34c9..b4de204 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,6 +43,7 @@ bytes = "1.2.1" cfg-if = "1.0.0" futures-core = "0.3.21" futures-util = "0.3.21" +portable-atomic = { version = "0.3.13", features = ["float"] } prost = "0.11.0" prost-derive = "0.11.0" serde = { version = "1.0.143", features = ["derive"] } @@ -68,6 +69,10 @@ required-features = ["mock"] name = "logging" required-features = ["mock"] +[[test]] +name = "metrics" +required-features = ["mock"] + [[example]] name = "simple_trace_report" path = "examples/simple_trace_report.rs" diff --git a/README.md b/README.md index 7cf924a..97211d4 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,14 @@ the context after the span finished. LogRecord is the simple builder for the LogData, which is the Log format of Skywalking. +## Metrics + +### Meter + +- **Counter** API represents a single monotonically increasing counter which automatically collects data and reports to the backend. +- **Gauge** API represents a single numerical value. +- **Histogram** API represents a summary sample observations with customized buckets. + # Example ```rust, no_run @@ -54,6 +62,7 @@ use skywalking::{ logging::{logger::Logger, record::{LogRecord, RecordType}}, reporter::grpc::GrpcReporter, trace::tracer::Tracer, + metrics::{meter::Counter, metricer::Metricer}, }; use std::error::Error; use tokio::signal; @@ -94,6 +103,18 @@ async fn handle_request(tracer: Tracer, logger: Logger) { // Auto report ctx when dropped. } +async fn handle_metric(mut metricer: Metricer) { + let counter = metricer.register( + Counter::new("instance_trace_count") + .add_label("region", "us-west") + .add_label("az", "az-1"), + ); + + metricer.boot().await; + + counter.increment(10.); +} + #[tokio::main] async fn main() -> Result<(), Box> { // Connect to skywalking oap server. @@ -109,7 +130,10 @@ async fn main() -> Result<(), Box> { .spawn(); let tracer = Tracer::new("service", "instance", reporter.clone()); - let logger = Logger::new("service", "instance", reporter); + let logger = Logger::new("service", "instance", reporter.clone()); + let metricer = Metricer::new("service", "instance", reporter); + + handle_metric(metricer).await; handle_request(tracer, logger).await; @@ -167,7 +191,7 @@ For details, please refer to [prost-build:sourcing-protoc](https://docs.rs/prost # Release -The SkyWalking committer(PMC included) could follow [this doc](Release-guide.md) to release an official version. +The SkyWalking committer(PMC included) could follow [this doc](https://github.com/apache/skywalking-rust/blob/master/Release-guide.md) to release an official version. # License diff --git a/build.rs b/build.rs index fa64567..7ef9a20 100644 --- a/build.rs +++ b/build.rs @@ -24,6 +24,7 @@ fn main() -> Result<(), Box> { .compile( &[ "./skywalking-data-collect-protocol/language-agent/Tracing.proto", + "./skywalking-data-collect-protocol/language-agent/Meter.proto", "./skywalking-data-collect-protocol/logging/Logging.proto", ], &["./skywalking-data-collect-protocol"], diff --git a/e2e/data/expected_context.yaml b/e2e/data/expected_context.yaml index 9778a2a..6551e9e 100644 --- a/e2e/data/expected_context.yaml +++ b/e2e/data/expected_context.yaml @@ -146,3 +146,33 @@ logItems: value: DEBUG timestamp: gt 0 serviceName: consumer + +meterItems: +- serviceName: consumer + meterSize: 3 + meters: + - meterId: + name: instance_trace_count + tags: + - name: region + value: us-west + - name: az + value: az-1 + singleValue: 30.0 + - meterId: + name: instance_trace_count + tags: + - name: region + value: us-east + - name: az + value: az-3 + singleValue: 20.0 + - meterId: + name: instance_trace_count + tags: + - name: region + value: us-north + - name: az + value: az-1 + histogramBuckets: [10.0, 20.0, 30.0] + histogramValues: [1, 2, 0] diff --git a/e2e/src/main.rs b/e2e/src/main.rs index f3ffd57..dc88364 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -25,6 +25,10 @@ use skywalking::{ logger::{self, Logger}, record::{LogRecord, RecordType}, }, + metrics::{ + meter::{Counter, Gauge, Histogram}, + metricer::Metricer, + }, reporter::grpc::GrpcReporter, trace::{ propagation::{ @@ -172,6 +176,33 @@ async fn run_consumer_service(host: [u8; 4]) { } } +fn run_consumer_metric(mut metricer: Metricer) { + let counter = metricer.register( + Counter::new("instance_trace_count") + .add_label("region", "us-west") + .add_label("az", "az-1"), + ); + metricer.register( + Gauge::new("instance_trace_count", || 20.) + .add_label("region", "us-east") + .add_label("az", "az-3"), + ); + let histogram = metricer.register( + Histogram::new("instance_trace_count", vec![10., 20., 30.]) + .add_label("region", "us-north") + .add_label("az", "az-1"), + ); + + counter.increment(10.); + counter.increment(20.); + + histogram.add_value(10.); + histogram.add_value(29.); + histogram.add_value(20.); + + metricer.boot(); +} + #[derive(StructOpt)] #[structopt(name = "basic")] struct Opt { @@ -187,7 +218,8 @@ async fn main() -> Result<(), Box> { if opt.mode == "consumer" { tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone())); - logger::set_global_logger(Logger::new("consumer", "node_0", reporter)); + logger::set_global_logger(Logger::new("consumer", "node_0", reporter.clone())); + run_consumer_metric(Metricer::new("consumer", "node_0", reporter)); run_consumer_service([0, 0, 0, 0]).await; } else if opt.mode == "producer" { tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter.clone())); diff --git a/examples/simple_metric_report.rs b/examples/simple_metric_report.rs new file mode 100644 index 0000000..ef4f9c8 --- /dev/null +++ b/examples/simple_metric_report.rs @@ -0,0 +1,55 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// + +use skywalking::{ + metrics::{meter::Counter, metricer::Metricer}, + reporter::grpc::GrpcReporter, +}; +use std::error::Error; +use tokio::signal; + +#[tokio::main] +async fn main() -> Result<(), Box> { + // Connect to skywalking oap server. + let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?; + + // Spawn the reporting in background, with listening the graceful shutdown + // signal. + let handle = reporter + .reporting() + .await + .with_graceful_shutdown(async move { + signal::ctrl_c().await.expect("failed to listen for event"); + }) + .spawn(); + + // Do metrics. + let mut metricer = Metricer::new("service", "instance", reporter.clone()); + let counter = metricer.register( + Counter::new("instance_trace_count") + .add_label("region", "us-west") + .add_label("az", "az-1"), + ); + + counter.increment(1.); + + metricer.boot().await.unwrap(); + handle.await.unwrap(); + + Ok(()) +} diff --git a/src/common/system_time.rs b/src/common/system_time.rs index ee97b4e..6513e81 100644 --- a/src/common/system_time.rs +++ b/src/common/system_time.rs @@ -19,6 +19,7 @@ use cfg_if::cfg_if; pub(crate) enum TimePeriod { Start, Log, + Metric, End, } @@ -28,6 +29,7 @@ cfg_if! { match period { TimePeriod::Start => 1, TimePeriod::Log => 10, + TimePeriod::Metric => 10, TimePeriod::End => 100, } } diff --git a/src/lib.rs b/src/lib.rs index 9cfd58b..7d4cc0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -20,6 +20,7 @@ pub mod common; pub(crate) mod error; pub mod logging; +pub mod metrics; pub mod reporter; pub mod skywalking_proto; pub mod trace; diff --git a/src/logging/logger.rs b/src/logging/logger.rs index 2571ea7..f1bb2dc 100644 --- a/src/logging/logger.rs +++ b/src/logging/logger.rs @@ -22,8 +22,8 @@ use tokio::sync::OnceCell; static GLOBAL_LOGGER: OnceCell = OnceCell::const_new(); /// Set the global logger. -pub fn set_global_logger(tracer: Logger) { - if GLOBAL_LOGGER.set(tracer).is_err() { +pub fn set_global_logger(logger: Logger) { + if GLOBAL_LOGGER.set(logger).is_err() { panic!("global logger has setted") } } diff --git a/src/logging/record.rs b/src/logging/record.rs index 87f22cf..02f5a83 100644 --- a/src/logging/record.rs +++ b/src/logging/record.rs @@ -55,16 +55,19 @@ impl LogRecord { Default::default() } + #[inline] pub fn custome_time(mut self, time: SystemTime) -> Self { self.time = Some(time); self } + #[inline] pub fn ignore_time(mut self) -> Self { self.is_ignore_time = true; self } + #[inline] pub fn endpoint(mut self, endpoint: impl ToString) -> Self { self.endpoint = endpoint.to_string(); self diff --git a/src/metrics/meter.rs b/src/metrics/meter.rs new file mode 100644 index 0000000..ea97852 --- /dev/null +++ b/src/metrics/meter.rs @@ -0,0 +1,344 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use crate::{ + common::system_time::{fetch_time, TimePeriod}, + metrics::metricer::Metricer, + skywalking_proto::v3::{ + meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, + }, +}; +use portable_atomic::AtomicF64; +use std::{ + cmp::Ordering::Equal, + sync::atomic::{AtomicI64, Ordering}, +}; + +pub trait Transform: Send + Sync { + fn meter_id(&self) -> MeterId; + + fn transform(&self, metricer: &Metricer) -> MeterData; +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub(crate) enum MeterType { + Counter, + Gauge, + Histogram, +} + +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct MeterId { + name: String, + typ: MeterType, + labels: Vec<(String, String)>, +} + +impl MeterId { + fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.labels.push((key.to_string(), value.to_string())); + self + } + + fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.labels.extend( + tags.into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())), + ); + self + } +} + +/// Counter mode. +pub enum CounterMode { + /// INCREMENT mode represents reporting the latest value. + INCREMENT, + + /// RATE mode represents reporting the increment rate. Value = latest value + /// - last reported value. + RATE, +} + +pub struct Counter { + id: MeterId, + mode: CounterMode, + count: AtomicF64, + previous_count: AtomicF64, +} + +impl Counter { + #[inline] + pub fn new(name: impl ToString) -> Self { + Self { + id: MeterId { + name: name.to_string(), + typ: MeterType::Counter, + labels: vec![], + }, + mode: CounterMode::INCREMENT, + count: AtomicF64::new(0.), + previous_count: AtomicF64::new(0.), + } + } + + #[inline] + pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.id = self.id.add_label(key, value); + self + } + + #[inline] + pub fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.id = self.id.add_labels(tags); + self + } + + #[inline] + pub fn mode(mut self, mode: CounterMode) -> Self { + self.mode = mode; + self + } + + pub fn increment(&self, count: f64) { + self.count.fetch_add(count, Ordering::Acquire); + } + + pub fn get(&self) -> f64 { + self.count.load(Ordering::Acquire) + } +} + +impl Transform for Counter { + fn meter_id(&self) -> MeterId { + self.id.clone() + } + + fn transform(&self, metricer: &Metricer) -> MeterData { + MeterData { + service: metricer.service_name().to_owned(), + service_instance: metricer.instance_name().to_owned(), + timestamp: fetch_time(TimePeriod::Metric), + metric: Some(Metric::SingleValue(MeterSingleValue { + name: self.id.name.to_owned(), + labels: self + .id + .labels + .iter() + .map(|(name, value)| Label { + name: name.clone(), + value: value.clone(), + }) + .collect(), + value: match self.mode { + CounterMode::INCREMENT => self.get(), + CounterMode::RATE => { + let current_count = self.get(); + let previous_count = + self.previous_count.swap(current_count, Ordering::Acquire); + current_count - previous_count + } + }, + })), + } + } +} + +pub struct Gauge { + id: MeterId, + getter: G, +} + +impl f64> Gauge { + #[inline] + pub fn new(name: impl ToString, getter: G) -> Self { + Self { + id: MeterId { + name: name.to_string(), + typ: MeterType::Gauge, + labels: vec![], + }, + getter, + } + } + + #[inline] + pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.id = self.id.add_label(key, value); + self + } + + #[inline] + pub fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.id = self.id.add_labels(tags); + self + } + + pub fn get(&self) -> f64 { + (self.getter)() + } +} + +impl f64 + Send + Sync> Transform for Gauge { + fn meter_id(&self) -> MeterId { + self.id.clone() + } + + fn transform(&self, metricer: &Metricer) -> MeterData { + MeterData { + service: metricer.service_name().to_owned(), + service_instance: metricer.instance_name().to_owned(), + timestamp: fetch_time(TimePeriod::Metric), + metric: Some(Metric::SingleValue(MeterSingleValue { + name: self.id.name.to_owned(), + labels: self + .id + .labels + .iter() + .map(|(name, value)| Label { + name: name.clone(), + value: value.clone(), + }) + .collect(), + value: self.get(), + })), + } + } +} + +struct Bucket { + bucket: f64, + count: AtomicI64, +} + +impl Bucket { + fn new(bucker: f64) -> Self { + Self { + bucket: bucker, + count: Default::default(), + } + } +} + +pub struct Histogram { + id: MeterId, + buckets: Vec, +} + +impl Histogram { + pub fn new(name: impl ToString, mut steps: Vec) -> Self { + Self { + id: MeterId { + name: name.to_string(), + typ: MeterType::Histogram, + labels: vec![], + }, + buckets: { + steps.sort_by(|a, b| a.partial_cmp(b).unwrap_or(Equal)); + steps.dedup(); + steps.into_iter().map(Bucket::new).collect() + }, + } + } + + #[inline] + pub fn add_label(mut self, key: impl ToString, value: impl ToString) -> Self { + self.id = self.id.add_label(key, value); + self + } + + #[inline] + pub fn add_labels(mut self, tags: I) -> Self + where + K: ToString, + V: ToString, + I: IntoIterator, + { + self.id = self.id.add_labels(tags); + self + } + + pub fn add_value(&self, value: f64) { + if let Some(index) = self.find_bucket(value) { + self.buckets[index].count.fetch_add(1, Ordering::Acquire); + } + } + + fn find_bucket(&self, value: f64) -> Option { + match self + .buckets + .binary_search_by(|bucket| bucket.bucket.partial_cmp(&value).unwrap_or(Equal)) + { + Ok(i) => Some(i), + Err(i) => { + if i >= 1 { + Some(i - 1) + } else { + None + } + } + } + } +} + +impl Transform for Histogram { + fn meter_id(&self) -> MeterId { + self.id.clone() + } + + fn transform(&self, metricer: &Metricer) -> MeterData { + MeterData { + service: metricer.service_name().to_owned(), + service_instance: metricer.instance_name().to_owned(), + timestamp: fetch_time(TimePeriod::Metric), + metric: Some(Metric::Histogram(MeterHistogram { + name: self.id.name.to_owned(), + labels: self + .id + .labels + .iter() + .map(|(name, value)| Label { + name: name.clone(), + value: value.clone(), + }) + .collect(), + values: self + .buckets + .iter() + .map(|bucket| MeterBucketValue { + bucket: bucket.bucket, + count: bucket.count.load(Ordering::Acquire), + is_negative_infinity: false, + }) + .collect(), + })), + } + } +} diff --git a/src/metrics/metricer.rs b/src/metrics/metricer.rs new file mode 100644 index 0000000..81b8351 --- /dev/null +++ b/src/metrics/metricer.rs @@ -0,0 +1,125 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use super::meter::{MeterId, Transform}; +use crate::reporter::{CollectItem, DynReport, Report}; +use std::{ + collections::HashMap, + future::Future, + pin::Pin, + sync::Arc, + task::{Context, Poll}, + time::Duration, +}; +use tokio::{ + select, spawn, + sync::mpsc, + task::{spawn_blocking, JoinError, JoinHandle}, + time::interval, +}; + +pub struct Metricer { + service_name: String, + instance_name: String, + reporter: Box, + meter_map: HashMap>, + report_interval: Duration, +} + +impl Metricer { + /// New with service info and reporter. + pub fn new( + service_name: impl ToString, + instance_name: impl ToString, + reporter: impl Report + Send + Sync + 'static, + ) -> Self { + Self { + service_name: service_name.to_string(), + instance_name: instance_name.to_string(), + reporter: Box::new(reporter), + meter_map: Default::default(), + report_interval: Duration::from_secs(20), + } + } + + pub fn service_name(&self) -> &str { + &self.service_name + } + + pub fn instance_name(&self) -> &str { + &self.instance_name + } + + pub fn set_report_interval(&mut self, report_interval: Duration) { + self.report_interval = report_interval; + } + + pub fn register(&mut self, transform: T) -> Arc { + let transform = Arc::new(transform); + self.meter_map + .insert(transform.meter_id(), transform.clone()); + transform + } + + pub fn boot(self) -> Booting { + let (shutdown_tx, mut shutdown_rx) = mpsc::channel(1); + + let handle = spawn(async move { + let mut ticker = interval(self.report_interval); + let metricer = Arc::new(self); + loop { + let metricer_ = metricer.clone(); + let _ = spawn_blocking(move || { + for trans in metricer_.meter_map.values() { + metricer_ + .reporter + .report(CollectItem::Meter(trans.transform(&metricer_))); + } + }) + .await; + + select! { + _ = ticker.tick() => {} + _ = shutdown_rx.recv() => { return; } + } + } + }); + Booting { + handle, + shutdown_tx, + } + } +} + +pub struct Booting { + handle: JoinHandle<()>, + shutdown_tx: mpsc::Sender<()>, +} + +impl Booting { + pub async fn shutdown(self) -> crate::Result<()> { + self.shutdown_tx.send(()).await.unwrap(); + Ok(self.await?) + } +} + +impl Future for Booting { + type Output = Result<(), JoinError>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.handle).poll(cx) + } +} diff --git a/src/metrics/mod.rs b/src/metrics/mod.rs new file mode 100644 index 0000000..3c67e08 --- /dev/null +++ b/src/metrics/mod.rs @@ -0,0 +1,18 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +pub mod meter; +pub mod metricer; diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index eb92517..fcea63d 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -18,7 +18,8 @@ use crate::{ reporter::{CollectItem, Report}, skywalking_proto::v3::{ log_report_service_client::LogReportServiceClient, - trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, + meter_report_service_client::MeterReportServiceClient, + trace_segment_report_service_client::TraceSegmentReportServiceClient, LogData, MeterData, SegmentObject, }, }; @@ -102,6 +103,7 @@ impl ColletcItemConsume for mpsc::UnboundedReceiver { struct Inner { trace_client: Mutex>, log_client: Mutex>, + meter_client: Mutex>, producer: P, consumer: Mutex>, is_reporting: AtomicBool, @@ -135,7 +137,8 @@ impl GrpcReporter { Self { inner: Arc::new(Inner { trace_client: Mutex::new(TraceSegmentReportServiceClient::new(channel.clone())), - log_client: Mutex::new(LogReportServiceClient::new(channel)), + log_client: Mutex::new(LogReportServiceClient::new(channel.clone())), + meter_client: Mutex::new(MeterReportServiceClient::new(channel)), producer, consumer: Mutex::new(Some(consumer)), is_reporting: Default::default(), @@ -168,9 +171,10 @@ impl GrpcReporter { Reporting { rb: ReporterAndBuffer { inner: Arc::clone(&self.inner), + status_handle: None, trace_buffer: Default::default(), log_buffer: Default::default(), - status_handle: None, + meter_buffer: Default::default(), }, shutdown_signal: Box::pin(pending()), consumer: self.inner.consumer.lock().await.take().unwrap(), @@ -201,9 +205,10 @@ impl Report for GrpcReporter struct ReporterAndBuffer { inner: Arc>, + status_handle: Option>, trace_buffer: LinkedList, log_buffer: LinkedList, - status_handle: Option>, + meter_buffer: LinkedList, } impl ReporterAndBuffer { @@ -216,6 +221,9 @@ impl ReporterAndBuffer { CollectItem::Log(item) => { self.log_buffer.push_back(item); } + CollectItem::Meter(item) => { + self.meter_buffer.push_back(item); + } } if !self.trace_buffer.is_empty() { @@ -248,6 +256,22 @@ impl ReporterAndBuffer { } } } + + if !self.meter_buffer.is_empty() { + let buffer = take(&mut self.meter_buffer); + if let Err(e) = self + .inner + .meter_client + .lock() + .await + .collect(stream::iter(buffer)) + .await + { + if let Some(status_handle) = &self.status_handle { + status_handle(e); + } + } + } } } diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs index 044afa2..79735bf 100644 --- a/src/reporter/mod.rs +++ b/src/reporter/mod.rs @@ -17,7 +17,7 @@ pub mod grpc; pub mod print; -use crate::skywalking_proto::v3::{LogData, SegmentObject}; +use crate::skywalking_proto::v3::{LogData, MeterData, SegmentObject}; use serde::{Deserialize, Serialize}; use std::{ops::Deref, sync::Arc}; use tokio::sync::OnceCell; @@ -27,6 +27,7 @@ use tokio::sync::OnceCell; pub enum CollectItem { Trace(SegmentObject), Log(LogData), + Meter(MeterData), } pub(crate) type DynReport = dyn Report + Send + Sync + 'static; diff --git a/src/reporter/print.rs b/src/reporter/print.rs index 6b355e5..e5b640f 100644 --- a/src/reporter/print.rs +++ b/src/reporter/print.rs @@ -36,11 +36,11 @@ impl PrintReporter { impl Report for PrintReporter { fn report(&self, items: CollectItem) { match items { - CollectItem::Trace(segment) => { + CollectItem::Trace(data) => { if self.use_stderr { - eprintln!("trace segment={:?}", segment); + eprintln!("trace segment={:?}", data); } else { - println!("trace segment={:?}", segment); + println!("trace segment={:?}", data); } } CollectItem::Log(data) => { @@ -50,6 +50,13 @@ impl Report for PrintReporter { println!("log data={:?}", data); } } + CollectItem::Meter(data) => { + if self.use_stderr { + eprintln!("meter data={:?}", data); + } else { + println!("meter data={:?}", data); + } + } } } } diff --git a/tests/metrics.rs b/tests/metrics.rs new file mode 100644 index 0000000..f1d95d0 --- /dev/null +++ b/tests/metrics.rs @@ -0,0 +1,174 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +use skywalking::{ + metrics::{ + meter::{Counter, Gauge, Histogram}, + metricer::Metricer, + }, + reporter::{CollectItem, Report}, + skywalking_proto::v3::{ + meter_data::Metric, Label, MeterBucketValue, MeterData, MeterHistogram, MeterSingleValue, + }, +}; +use std::{ + collections::LinkedList, + sync::{Arc, Mutex}, +}; + +#[tokio::test(flavor = "multi_thread", worker_threads = 1)] +async fn metrics() { + let reporter = Arc::new(MockReporter::default()); + + { + let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone()); + let counter = metricer.register( + Counter::new("instance_trace_count") + .add_label("region", "us-west") + .add_labels([("az", "az-1")]), + ); + counter.increment(100.); + + metricer.boot().shutdown().await.unwrap(); + + assert_eq!( + reporter.pop(), + MeterData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + metric: Some(Metric::SingleValue(MeterSingleValue { + name: "instance_trace_count".to_owned(), + labels: vec![ + Label { + name: "region".to_owned(), + value: "us-west".to_owned() + }, + Label { + name: "az".to_owned(), + value: "az-1".to_owned() + }, + ], + value: 100. + })), + } + ); + } + + { + let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone()); + metricer.register( + Gauge::new("instance_trace_count", || 100.) + .add_label("region", "us-west") + .add_labels([("az", "az-1")]), + ); + + metricer.boot().shutdown().await.unwrap(); + + assert_eq!( + reporter.pop(), + MeterData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + metric: Some(Metric::SingleValue(MeterSingleValue { + name: "instance_trace_count".to_owned(), + labels: vec![ + Label { + name: "region".to_owned(), + value: "us-west".to_owned() + }, + Label { + name: "az".to_owned(), + value: "az-1".to_owned() + }, + ], + value: 100. + })), + } + ); + } + + { + let mut metricer = Metricer::new("service_name", "instance_name", reporter.clone()); + let histogram = metricer.register( + Histogram::new("instance_trace_count", vec![1., 2.]) + .add_label("region", "us-west") + .add_labels([("az", "az-1")]), + ); + histogram.add_value(1.); + histogram.add_value(1.5); + histogram.add_value(2.); + + metricer.boot().shutdown().await.unwrap(); + + assert_eq!( + reporter.pop(), + MeterData { + timestamp: 10, + service: "service_name".to_owned(), + service_instance: "instance_name".to_owned(), + metric: Some(Metric::Histogram(MeterHistogram { + name: "instance_trace_count".to_owned(), + labels: vec![ + Label { + name: "region".to_owned(), + value: "us-west".to_owned() + }, + Label { + name: "az".to_owned(), + value: "az-1".to_owned() + }, + ], + values: vec![ + MeterBucketValue { + bucket: 1., + count: 2, + is_negative_infinity: false + }, + MeterBucketValue { + bucket: 2., + count: 1, + is_negative_infinity: false + }, + ] + })), + } + ); + } +} + +#[derive(Default, Clone)] +struct MockReporter { + items: Arc>>, +} + +impl MockReporter { + fn pop(&self) -> MeterData { + self.items.try_lock().unwrap().pop_back().unwrap() + } +} + +impl Report for MockReporter { + fn report(&self, item: CollectItem) { + match item { + CollectItem::Meter(data) => { + self.items.try_lock().unwrap().push_back(data); + } + _ => {} + } + } +}