Skip to content

Commit

Permalink
Add metrics APIs. (#44)
Browse files Browse the repository at this point in the history
  • Loading branch information
jmjoy committed Aug 25, 2022
1 parent 4d786ff commit 4876628
Show file tree
Hide file tree
Showing 17 changed files with 859 additions and 13 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Expand Up @@ -43,6 +43,7 @@ bytes = "1.2.1"
cfg-if = "1.0.0"
futures-core = "0.3.21"
futures-util = "0.3.21"
portable-atomic = { version = "0.3.13", features = ["float"] }
prost = "0.11.0"
prost-derive = "0.11.0"
serde = { version = "1.0.143", features = ["derive"] }
Expand All @@ -68,6 +69,10 @@ required-features = ["mock"]
name = "logging"
required-features = ["mock"]

[[test]]
name = "metrics"
required-features = ["mock"]

[[example]]
name = "simple_trace_report"
path = "examples/simple_trace_report.rs"
28 changes: 26 additions & 2 deletions README.md
Expand Up @@ -47,13 +47,22 @@ the context after the span finished.

LogRecord is the simple builder for the LogData, which is the Log format of Skywalking.

## Metrics

### Meter

- **Counter** API represents a single monotonically increasing counter which automatically collects data and reports to the backend.
- **Gauge** API represents a single numerical value.
- **Histogram** API represents a summary sample observations with customized buckets.

# Example

```rust, no_run
use skywalking::{
logging::{logger::Logger, record::{LogRecord, RecordType}},
reporter::grpc::GrpcReporter,
trace::tracer::Tracer,
metrics::{meter::Counter, metricer::Metricer},
};
use std::error::Error;
use tokio::signal;
Expand Down Expand Up @@ -94,6 +103,18 @@ async fn handle_request(tracer: Tracer, logger: Logger) {
// Auto report ctx when dropped.
}
async fn handle_metric(mut metricer: Metricer) {
let counter = metricer.register(
Counter::new("instance_trace_count")
.add_label("region", "us-west")
.add_label("az", "az-1"),
);
metricer.boot().await;
counter.increment(10.);
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
Expand All @@ -109,7 +130,10 @@ async fn main() -> Result<(), Box<dyn Error>> {
.spawn();
let tracer = Tracer::new("service", "instance", reporter.clone());
let logger = Logger::new("service", "instance", reporter);
let logger = Logger::new("service", "instance", reporter.clone());
let metricer = Metricer::new("service", "instance", reporter);
handle_metric(metricer).await;
handle_request(tracer, logger).await;
Expand Down Expand Up @@ -167,7 +191,7 @@ For details, please refer to [prost-build:sourcing-protoc](https://docs.rs/prost

# Release

The SkyWalking committer(PMC included) could follow [this doc](Release-guide.md) to release an official version.
The SkyWalking committer(PMC included) could follow [this doc](https://github.com/apache/skywalking-rust/blob/master/Release-guide.md) to release an official version.

# License

Expand Down
1 change: 1 addition & 0 deletions build.rs
Expand Up @@ -24,6 +24,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
.compile(
&[
"./skywalking-data-collect-protocol/language-agent/Tracing.proto",
"./skywalking-data-collect-protocol/language-agent/Meter.proto",
"./skywalking-data-collect-protocol/logging/Logging.proto",
],
&["./skywalking-data-collect-protocol"],
Expand Down
30 changes: 30 additions & 0 deletions e2e/data/expected_context.yaml
Expand Up @@ -146,3 +146,33 @@ logItems:
value: DEBUG
timestamp: gt 0
serviceName: consumer

meterItems:
- serviceName: consumer
meterSize: 3
meters:
- meterId:
name: instance_trace_count
tags:
- name: region
value: us-west
- name: az
value: az-1
singleValue: 30.0
- meterId:
name: instance_trace_count
tags:
- name: region
value: us-east
- name: az
value: az-3
singleValue: 20.0
- meterId:
name: instance_trace_count
tags:
- name: region
value: us-north
- name: az
value: az-1
histogramBuckets: [10.0, 20.0, 30.0]
histogramValues: [1, 2, 0]
34 changes: 33 additions & 1 deletion e2e/src/main.rs
Expand Up @@ -25,6 +25,10 @@ use skywalking::{
logger::{self, Logger},
record::{LogRecord, RecordType},
},
metrics::{
meter::{Counter, Gauge, Histogram},
metricer::Metricer,
},
reporter::grpc::GrpcReporter,
trace::{
propagation::{
Expand Down Expand Up @@ -172,6 +176,33 @@ async fn run_consumer_service(host: [u8; 4]) {
}
}

fn run_consumer_metric(mut metricer: Metricer) {
let counter = metricer.register(
Counter::new("instance_trace_count")
.add_label("region", "us-west")
.add_label("az", "az-1"),
);
metricer.register(
Gauge::new("instance_trace_count", || 20.)
.add_label("region", "us-east")
.add_label("az", "az-3"),
);
let histogram = metricer.register(
Histogram::new("instance_trace_count", vec![10., 20., 30.])
.add_label("region", "us-north")
.add_label("az", "az-1"),
);

counter.increment(10.);
counter.increment(20.);

histogram.add_value(10.);
histogram.add_value(29.);
histogram.add_value(20.);

metricer.boot();
}

#[derive(StructOpt)]
#[structopt(name = "basic")]
struct Opt {
Expand All @@ -187,7 +218,8 @@ async fn main() -> Result<(), Box<dyn Error>> {

if opt.mode == "consumer" {
tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone()));
logger::set_global_logger(Logger::new("consumer", "node_0", reporter));
logger::set_global_logger(Logger::new("consumer", "node_0", reporter.clone()));
run_consumer_metric(Metricer::new("consumer", "node_0", reporter));
run_consumer_service([0, 0, 0, 0]).await;
} else if opt.mode == "producer" {
tracer::set_global_tracer(Tracer::new("producer", "node_0", reporter.clone()));
Expand Down
55 changes: 55 additions & 0 deletions examples/simple_metric_report.rs
@@ -0,0 +1,55 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//

use skywalking::{
metrics::{meter::Counter, metricer::Metricer},
reporter::grpc::GrpcReporter,
};
use std::error::Error;
use tokio::signal;

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// Connect to skywalking oap server.
let reporter = GrpcReporter::connect("http://0.0.0.0:11800").await?;

// Spawn the reporting in background, with listening the graceful shutdown
// signal.
let handle = reporter
.reporting()
.await
.with_graceful_shutdown(async move {
signal::ctrl_c().await.expect("failed to listen for event");
})
.spawn();

// Do metrics.
let mut metricer = Metricer::new("service", "instance", reporter.clone());
let counter = metricer.register(
Counter::new("instance_trace_count")
.add_label("region", "us-west")
.add_label("az", "az-1"),
);

counter.increment(1.);

metricer.boot().await.unwrap();
handle.await.unwrap();

Ok(())
}
2 changes: 2 additions & 0 deletions src/common/system_time.rs
Expand Up @@ -19,6 +19,7 @@ use cfg_if::cfg_if;
pub(crate) enum TimePeriod {
Start,
Log,
Metric,
End,
}

Expand All @@ -28,6 +29,7 @@ cfg_if! {
match period {
TimePeriod::Start => 1,
TimePeriod::Log => 10,
TimePeriod::Metric => 10,
TimePeriod::End => 100,
}
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Expand Up @@ -20,6 +20,7 @@
pub mod common;
pub(crate) mod error;
pub mod logging;
pub mod metrics;
pub mod reporter;
pub mod skywalking_proto;
pub mod trace;
Expand Down
4 changes: 2 additions & 2 deletions src/logging/logger.rs
Expand Up @@ -22,8 +22,8 @@ use tokio::sync::OnceCell;
static GLOBAL_LOGGER: OnceCell<Logger> = OnceCell::const_new();

/// Set the global logger.
pub fn set_global_logger(tracer: Logger) {
if GLOBAL_LOGGER.set(tracer).is_err() {
pub fn set_global_logger(logger: Logger) {
if GLOBAL_LOGGER.set(logger).is_err() {
panic!("global logger has setted")
}
}
Expand Down
3 changes: 3 additions & 0 deletions src/logging/record.rs
Expand Up @@ -55,16 +55,19 @@ impl LogRecord {
Default::default()
}

#[inline]
pub fn custome_time(mut self, time: SystemTime) -> Self {
self.time = Some(time);
self
}

#[inline]
pub fn ignore_time(mut self) -> Self {
self.is_ignore_time = true;
self
}

#[inline]
pub fn endpoint(mut self, endpoint: impl ToString) -> Self {
self.endpoint = endpoint.to_string();
self
Expand Down

0 comments on commit 4876628

Please sign in to comment.