From 84fbd006426b7863ce62ce83ff01f6b98b1c560b Mon Sep 17 00:00:00 2001 From: jmjoy Date: Mon, 19 Jun 2023 18:48:22 +0800 Subject: [PATCH] Add kafka reporter. (#61) --- .github/workflows/ci.yaml | 1 + .github/workflows/e2e.yml | 2 + Cargo.toml | 3 + README.md | 36 ++ dist-material/LICENSE | 16 +- .../licenses/LICENSE-rdkafka-sys.txt | 21 ++ dist-material/licenses/LICENSE-rdkafka.txt | 21 ++ docker-compose.e2e.yml | 14 + e2e/Cargo.toml | 11 +- e2e/docker/Dockerfile | 1 + e2e/src/e2e_kafka.rs | 293 +++++++++++++++ e2e/src/main.rs | 55 ++- src/error/mod.rs | 5 + src/proto/v3/mod.rs | 1 + src/reporter/grpc.rs | 62 +--- src/reporter/kafka.rs | 348 ++++++++++++++++++ src/reporter/mod.rs | 115 +++++- 17 files changed, 934 insertions(+), 71 deletions(-) create mode 100644 dist-material/licenses/LICENSE-rdkafka-sys.txt create mode 100644 dist-material/licenses/LICENSE-rdkafka.txt create mode 100644 e2e/src/e2e_kafka.rs create mode 100644 src/reporter/kafka.rs diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 10d189e..832700c 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -91,6 +91,7 @@ jobs: - "" - "--features management" - "--features vendored" + - "--features kafka-reporter" - "--all-features" runs-on: ubuntu-20.04 steps: diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index d1fab52..79609e2 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -40,3 +40,5 @@ jobs: pip3 install setuptools pip3 install -r requirements.txt python3 e2e/run_e2e.py --expected_file=e2e/data/expected_context.yaml --max_retry_times=3 --target_path=/ping + docker-compose -f docker-compose.e2e.yml logs producer consumer + docker-compose -f docker-compose.e2e.yml exec -T producer /build/target/release/e2e-kafka diff --git a/Cargo.toml b/Cargo.toml index da1df32..909adb7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,7 @@ rust-version = "1.65" [features] management = ["hostname", "systemstat"] vendored = ["protobuf-src"] +kafka-reporter = ["rdkafka"] mock = [] # For internal integration testing only, do not use. @@ -46,11 +47,13 @@ cfg-if = "1.0.0" futures-core = "0.3.21" futures-util = "0.3.21" hostname = { version = "0.3.1", optional = true } +libz-sys = "1.1.9" once_cell = "1.14.0" parking_lot = "0.12.1" portable-atomic = { version = "0.3.13", features = ["float"] } prost = "0.11.0" prost-derive = "0.11.0" +rdkafka = { version = "0.32.2", optional = true } serde = { version = "1.0.143", features = ["derive"] } systemstat = { version = "0.2.0", optional = true } thiserror = "1.0.32" diff --git a/README.md b/README.md index 715001e..d01ab4a 100644 --- a/README.md +++ b/README.md @@ -212,6 +212,42 @@ async fn handle(tracer: Tracer) { } ``` +# Advanced Reporter + +The advanced report provides an alternative way to submit the agent collected data to the backend. + +## kafka reporter + +The Kafka reporter plugin support report traces, metrics, logs, instance properties to Kafka cluster. + +Make sure the feature `kafka-reporter` is enabled. + +```rust +#[cfg(feature = "kafka-reporter")] +mod example { + use skywalking::reporter::Report; + use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig}; + + async fn do_something(reporter: &impl Report) { + // .... + } + + async fn foo() { + let mut client_config = RDKafkaClientConfig::new(); + client_config + .set("bootstrap.servers", "broker:9092") + .set("message.timeout.ms", "6000"); + + let (reporter, reporting) = KafkaReportBuilder::new(client_config).build().await.unwrap(); + let handle = reporting.spawn(); + + do_something(&reporter); + + handle.await.unwrap(); + } +} +``` + # How to compile? If you have `skywalking-(VERSION).crate`, you can unpack it with the way as follows: diff --git a/dist-material/LICENSE b/dist-material/LICENSE index 6c5eda5..a7c1296 100644 --- a/dist-material/LICENSE +++ b/dist-material/LICENSE @@ -220,9 +220,11 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://crates.io/crates/autocfg/1.1.0 1.1.0 Apache-2.0 OR MIT https://crates.io/crates/base64/0.13.0 0.13.0 Apache-2.0 OR MIT https://crates.io/crates/bitflags/1.3.1 1.3.1 Apache-2.0 OR MIT - https://crates.io/crates/cc/1.0.0 1.0.0 Apache-2.0 OR MIT + https://crates.io/crates/cc/1.0.18 1.0.18 Apache-2.0 OR MIT https://crates.io/crates/cfg-if/0.1.2 0.1.2 Apache-2.0 OR MIT https://crates.io/crates/cfg-if/1.0.0 1.0.0 Apache-2.0 OR MIT + https://crates.io/crates/derivative/2.0.0 2.0.0 Apache-2.0 OR MIT + https://crates.io/crates/dtoa/0.4.0 0.4.0 Apache-2.0 OR MIT https://crates.io/crates/either/1.0.0 1.0.0 Apache-2.0 OR MIT https://crates.io/crates/fixedbitset/0.4.0 0.4.0 Apache-2.0 OR MIT https://crates.io/crates/fnv/1.0.5 1.0.5 Apache-2.0 OR MIT @@ -242,10 +244,12 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://crates.io/crates/hyper-timeout/0.4.0 0.4.0 Apache-2.0 OR MIT https://crates.io/crates/indexmap/1.6.2 1.6.2 Apache-2.0 OR MIT https://crates.io/crates/itertools/0.10.0 0.10.0 Apache-2.0 OR MIT + https://crates.io/crates/itoa/0.3.0 0.3.0 Apache-2.0 OR MIT https://crates.io/crates/itoa/0.4.1 0.4.1 Apache-2.0 OR MIT https://crates.io/crates/itoa/1.0.1 1.0.1 Apache-2.0 OR MIT https://crates.io/crates/lazy_static/1.4.0 1.4.0 Apache-2.0 OR MIT https://crates.io/crates/libc/0.2.114 0.2.114 Apache-2.0 OR MIT + https://crates.io/crates/libz-sys/1.1.9 1.1.9 Apache-2.0 OR MIT https://crates.io/crates/lock_api/0.4.6 0.4.6 Apache-2.0 OR MIT https://crates.io/crates/log/0.4.17 0.4.17 Apache-2.0 OR MIT https://crates.io/crates/match_cfg/0.1.0 0.1.0 Apache-2.0 OR MIT @@ -254,6 +258,7 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://crates.io/crates/miow/0.3.6 0.3.6 Apache-2.0 OR MIT https://crates.io/crates/multimap/0.8.0 0.8.0 Apache-2.0 OR MIT https://crates.io/crates/ntapi/0.3.0 0.3.0 Apache-2.0 OR MIT + https://crates.io/crates/num-traits/0.1.32 0.1.32 Apache-2.0 OR MIT https://crates.io/crates/num_cpus/1.8.0 1.8.0 Apache-2.0 OR MIT https://crates.io/crates/num_threads/0.1.2 0.1.2 Apache-2.0 OR MIT https://crates.io/crates/once_cell/1.14.0 1.14.0 Apache-2.0 OR MIT @@ -267,9 +272,11 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://crates.io/crates/pin-project-internal/1.0.2 1.0.2 Apache-2.0 OR MIT https://crates.io/crates/pin-project-lite/0.2.9 0.2.9 Apache-2.0 OR MIT https://crates.io/crates/pin-utils/0.1.0 0.1.0 Apache-2.0 OR MIT + https://crates.io/crates/pkg-config/0.3.9 0.3.9 Apache-2.0 OR MIT https://crates.io/crates/portable-atomic/0.3.13 0.3.13 Apache-2.0 OR MIT https://crates.io/crates/ppv-lite86/0.2.8 0.2.8 Apache-2.0 OR MIT https://crates.io/crates/prettyplease/0.1.0 0.1.0 Apache-2.0 OR MIT + https://crates.io/crates/proc-macro-crate/0.1.4 0.1.4 Apache-2.0 OR MIT https://crates.io/crates/proc-macro2/1.0.32 1.0.32 Apache-2.0 OR MIT https://crates.io/crates/quote/1.0.0 1.0.0 Apache-2.0 OR MIT https://crates.io/crates/rand/0.4.1 0.4.1 Apache-2.0 OR MIT @@ -283,6 +290,7 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://crates.io/crates/scopeguard/1.1.0 1.1.0 Apache-2.0 OR MIT https://crates.io/crates/serde/1.0.143 1.0.143 Apache-2.0 OR MIT https://crates.io/crates/serde_derive/1.0.143 1.0.143 Apache-2.0 OR MIT + https://crates.io/crates/serde_json/1.0.0 1.0.0 Apache-2.0 OR MIT https://crates.io/crates/signal-hook-registry/1.1.1 1.1.1 Apache-2.0 OR MIT https://crates.io/crates/smallvec/1.6.1 1.6.1 Apache-2.0 OR MIT https://crates.io/crates/socket2/0.3.17 0.3.17 Apache-2.0 OR MIT @@ -295,10 +303,12 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://crates.io/crates/thiserror-impl/1.0.32 1.0.32 Apache-2.0 OR MIT https://crates.io/crates/time/0.3.9 0.3.9 Apache-2.0 OR MIT https://crates.io/crates/tokio-io-timeout/1.0.1 1.0.1 Apache-2.0 OR MIT + https://crates.io/crates/toml/0.5.2 0.5.2 Apache-2.0 OR MIT https://crates.io/crates/unicode-segmentation/1.2.0 1.2.0 Apache-2.0 OR MIT https://crates.io/crates/unicode-width/0.1.4 0.1.4 Apache-2.0 OR MIT https://crates.io/crates/unicode-xid/0.2.0 0.2.0 Apache-2.0 OR MIT https://crates.io/crates/uuid/1.1.2 1.1.2 Apache-2.0 OR MIT + https://crates.io/crates/vcpkg/0.2.0 0.2.0 Apache-2.0 OR MIT https://crates.io/crates/vec_map/0.8.0 0.8.0 Apache-2.0 OR MIT https://crates.io/crates/version_check/0.9.0 0.9.0 Apache-2.0 OR MIT https://crates.io/crates/winapi/0.3.9 0.3.9 Apache-2.0 OR MIT @@ -319,6 +329,8 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://crates.io/crates/fuchsia-zircon/0.3.1 0.3.1 BSD-3-Clause https://crates.io/crates/fuchsia-zircon-sys/0.3.1 0.3.1 BSD-3-Clause + https://crates.io/crates/num_enum/0.5.0 0.5.0 BSD-3-Clause + https://crates.io/crates/num_enum_derive/0.5.0 0.5.0 BSD-3-Clause ======================================================================== MIT licenses @@ -345,6 +357,8 @@ The text of each license is also included in licenses/LICENSE-[project].txt. https://crates.io/crates/mio/0.8.1 0.8.1 MIT https://crates.io/crates/nom/7.0.0 7.0.0 MIT https://crates.io/crates/proc-macro-error/0.2.0 0.2.0 MIT + https://crates.io/crates/rdkafka/0.32.2 0.32.2 MIT + https://crates.io/crates/rdkafka-sys/4.5.0+1.9.2 4.5.0+1.9.2 MIT https://crates.io/crates/redox_syscall/0.1.38 0.1.38 MIT https://crates.io/crates/redox_syscall/0.2.8 0.2.8 MIT https://crates.io/crates/slab/0.4.2 0.4.2 MIT diff --git a/dist-material/licenses/LICENSE-rdkafka-sys.txt b/dist-material/licenses/LICENSE-rdkafka-sys.txt new file mode 100644 index 0000000..eb7158a --- /dev/null +++ b/dist-material/licenses/LICENSE-rdkafka-sys.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 Federico Giraud + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/dist-material/licenses/LICENSE-rdkafka.txt b/dist-material/licenses/LICENSE-rdkafka.txt new file mode 100644 index 0000000..eb7158a --- /dev/null +++ b/dist-material/licenses/LICENSE-rdkafka.txt @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2016 Federico Giraud + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/docker-compose.e2e.yml b/docker-compose.e2e.yml index 3462189..84c7d92 100644 --- a/docker-compose.e2e.yml +++ b/docker-compose.e2e.yml @@ -37,6 +37,8 @@ services: depends_on: collector: condition: service_healthy + broker: + condition: service_healthy healthcheck: test: [ "CMD", "curl", "http://0.0.0.0:8082/healthCheck" ] interval: 5s @@ -54,3 +56,15 @@ services: condition: service_healthy consumer: condition: service_healthy + broker: + condition: service_healthy + + broker: + image: landoop/fast-data-dev:3.3 + container_name: broker + ports: + - "9092:9092" + healthcheck: + test: [ "CMD", "nc", "-zv", "0.0.0.0", "9092"] + interval: 5s + timeout: 5s diff --git a/e2e/Cargo.toml b/e2e/Cargo.toml index 2b156d0..d2e49e1 100644 --- a/e2e/Cargo.toml +++ b/e2e/Cargo.toml @@ -22,9 +22,16 @@ authors = ["Apache Software Foundation"] edition = "2021" publish = false license = "Apache-2.0" +default-run = "e2e" + +[[bin]] +name = "e2e-kafka" +path = "src/e2e_kafka.rs" [dependencies] -skywalking = { path = ".." } hyper = { version = "0.14", features = ["full"] } -tokio = { version = "1", features = ["full"] } +prost = "0.11.0" +rdkafka = "0.32.2" +skywalking = { path = "..", features = ["kafka-reporter"] } structopt = "0.3" +tokio = { version = "1", features = ["full"] } diff --git a/e2e/docker/Dockerfile b/e2e/docker/Dockerfile index 94ab5c3..13fb2d2 100644 --- a/e2e/docker/Dockerfile +++ b/e2e/docker/Dockerfile @@ -20,4 +20,5 @@ RUN apt-get update && apt-get install -y cmake protobuf-compiler WORKDIR /build COPY . /build/ RUN cargo build --release --workspace +ENV RUST_BACKTRACE=1 ENTRYPOINT ["/build/target/release/e2e"] diff --git a/e2e/src/e2e_kafka.rs b/e2e/src/e2e_kafka.rs new file mode 100644 index 0000000..e785050 --- /dev/null +++ b/e2e/src/e2e_kafka.rs @@ -0,0 +1,293 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +#![allow(clippy::bool_assert_comparison)] + +use prost::Message; +use rdkafka::{ + consumer::{Consumer, StreamConsumer}, + ClientConfig, Message as _, +}; +use skywalking::proto::v3::{ + log_data_body::Content, meter_data::Metric, JsonLog, KeyStringValuePair, LogData, LogTags, + MeterData, RefType, SegmentObject, SpanLayer, SpanType, +}; +use std::time::Duration; +use tokio::time::timeout; + +async fn segment() { + let consumer = create_consumer("skywalking-segments"); + + for _ in 0..3 { + let segment: SegmentObject = consumer_recv(&consumer).await; + check_segment(&segment); + } +} + +fn check_segment(segment: &SegmentObject) { + dbg!(segment); + + assert_eq!(segment.service_instance, "node_0"); + + if segment.service == "consumer" { + assert_eq!(segment.spans.len(), 1); + + assert_eq!(segment.spans[0].span_id, 0); + assert_eq!(segment.spans[0].parent_span_id, -1); + assert_eq!(segment.spans[0].operation_name, "/pong"); + assert_eq!(segment.spans[0].peer, ""); + assert_eq!(segment.spans[0].span_type, SpanType::Entry as i32); + assert_eq!(segment.spans[0].span_layer, SpanLayer::Http as i32); + assert_eq!(segment.spans[0].component_id, 11000); + assert_eq!(segment.spans[0].is_error, false); + assert_eq!(segment.spans[0].tags.len(), 0); + assert_eq!(segment.spans[0].logs.len(), 0); + assert_eq!(segment.spans[0].refs.len(), 1); + + assert_eq!( + segment.spans[0].refs[0].ref_type, + RefType::CrossProcess as i32 + ); + assert_eq!(segment.spans[0].refs[0].parent_span_id, 1); + assert_eq!(segment.spans[0].refs[0].parent_service, "producer"); + assert_eq!(segment.spans[0].refs[0].parent_service_instance, "node_0"); + assert_eq!(segment.spans[0].refs[0].parent_endpoint, "/pong"); + assert_eq!( + segment.spans[0].refs[0].network_address_used_at_peer, + "consumer:8082" + ); + } else if segment.service == "producer" { + if segment.spans.last().unwrap().operation_name == "/ping" { + assert_eq!(segment.spans.len(), 3); + + assert_eq!(segment.spans[0].span_id, 1); + assert_eq!(segment.spans[0].parent_span_id, 0); + assert_eq!(segment.spans[0].operation_name, "/pong"); + assert_eq!(segment.spans[0].peer, "consumer:8082"); + assert_eq!(segment.spans[0].span_type, SpanType::Exit as i32); + assert_eq!(segment.spans[0].span_layer, SpanLayer::Unknown as i32); + assert_eq!(segment.spans[0].component_id, 11000); + assert_eq!(segment.spans[0].is_error, false); + assert_eq!(segment.spans[0].tags.len(), 0); + assert_eq!(segment.spans[0].logs.len(), 0); + assert_eq!(segment.spans[0].refs.len(), 0); + + assert_eq!(segment.spans[1].span_id, 2); + assert_eq!(segment.spans[1].parent_span_id, 0); + assert_eq!(segment.spans[1].operation_name, "async-job"); + assert_eq!(segment.spans[1].peer, ""); + assert_eq!(segment.spans[1].span_type, SpanType::Local as i32); + assert_eq!(segment.spans[1].span_layer, SpanLayer::Unknown as i32); + assert_eq!(segment.spans[1].component_id, 11000); + assert_eq!(segment.spans[1].is_error, false); + assert_eq!(segment.spans[1].tags.len(), 0); + assert_eq!(segment.spans[1].logs.len(), 0); + assert_eq!(segment.spans[1].refs.len(), 0); + + assert_eq!(segment.spans[2].span_id, 0); + assert_eq!(segment.spans[2].parent_span_id, -1); + assert_eq!(segment.spans[2].operation_name, "/ping"); + assert_eq!(segment.spans[2].peer, ""); + assert_eq!(segment.spans[2].span_type, SpanType::Entry as i32); + assert_eq!(segment.spans[2].span_layer, SpanLayer::Http as i32); + assert_eq!(segment.spans[2].component_id, 11000); + assert_eq!(segment.spans[2].is_error, false); + assert_eq!(segment.spans[2].tags.len(), 0); + assert_eq!(segment.spans[2].logs.len(), 0); + assert_eq!(segment.spans[2].refs.len(), 0); + } else if segment.spans.last().unwrap().operation_name == "async-callback" { + assert_eq!(segment.spans.len(), 1); + + assert_eq!(segment.spans[0].span_id, 0); + assert_eq!(segment.spans[0].parent_span_id, -1); + assert_eq!(segment.spans[0].peer, ""); + assert_eq!(segment.spans[0].span_type, SpanType::Entry as i32); + assert_eq!(segment.spans[0].span_layer, SpanLayer::Http as i32); + assert_eq!(segment.spans[0].component_id, 11000); + assert_eq!(segment.spans[0].is_error, false); + assert_eq!(segment.spans[0].tags.len(), 0); + assert_eq!(segment.spans[0].logs.len(), 0); + assert_eq!(segment.spans[0].refs.len(), 1); + + assert_eq!( + segment.spans[0].refs[0].ref_type, + RefType::CrossThread as i32 + ); + assert_eq!(segment.spans[0].refs[0].parent_span_id, 2); + assert_eq!(segment.spans[0].refs[0].parent_service, "producer"); + assert_eq!(segment.spans[0].refs[0].parent_service_instance, "node_0"); + assert_eq!(segment.spans[0].refs[0].parent_endpoint, "async-job"); + assert_eq!(segment.spans[0].refs[0].network_address_used_at_peer, ""); + } else { + panic!( + "unknown operation_name {}", + segment.spans.last().unwrap().operation_name + ); + } + } else { + panic!("unknown service {}", segment.service); + } +} + +async fn meter() { + let consumer = create_consumer("skywalking-meters"); + + for _ in 0..3 { + let meter: MeterData = consumer_recv(&consumer).await; + check_meter(&meter); + } +} + +fn check_meter(meter: &MeterData) { + dbg!(meter); + + assert_eq!(meter.service, "consumer"); + assert_eq!(meter.service_instance, "node_0"); + + match &meter.metric { + Some(Metric::SingleValue(value)) => { + assert_eq!(value.name, "instance_trace_count"); + + if value.labels[0].name == "region" && value.labels[0].value == "us-west" { + assert_eq!(value.labels[1].name, "az"); + assert_eq!(value.labels[1].value, "az-1"); + assert_eq!(value.value, 30.0); + } else if value.labels[0].name == "region" && value.labels[0].value == "us-east" { + assert_eq!(value.labels[1].name, "az"); + assert_eq!(value.labels[1].value, "az-3"); + assert_eq!(value.value, 20.0); + } else { + panic!("unknown label {:?}", &value.labels[0]); + } + } + Some(Metric::Histogram(value)) => { + assert_eq!(value.name, "instance_trace_count"); + assert_eq!(value.labels[0].name, "region"); + assert_eq!(value.labels[0].value, "us-north"); + assert_eq!(value.labels[1].name, "az"); + assert_eq!(value.labels[1].value, "az-1"); + assert_eq!(value.values[0].bucket, 10.0); + assert_eq!(value.values[0].count, 1); + assert_eq!(value.values[1].bucket, 20.0); + assert_eq!(value.values[1].count, 2); + assert_eq!(value.values[2].bucket, 30.0); + assert_eq!(value.values[2].count, 0); + } + _ => { + panic!("unknown metric"); + } + } +} + +async fn log() { + let consumer = create_consumer("skywalking-logs"); + + for _ in 0..3 { + let log: LogData = consumer_recv(&consumer).await; + check_log(&log); + } +} + +fn check_log(log: &LogData) { + dbg!(log); + + if log.service == "producer" && log.service_instance == "node_0" { + assert_eq!(log.endpoint, "/ping"); + + match &log.body.as_ref().unwrap().content { + Some(Content::Json(json)) => { + assert_eq!(json.json, r#"{"message": "handle ping"}"#); + assert_eq!(log.trace_context, None); + assert_eq!( + log.tags, + Some(LogTags { + data: vec![KeyStringValuePair { + key: "level".to_string(), + value: "DEBUG".to_string() + }] + }) + ); + } + Some(Content::Text(text)) => { + assert_eq!(text.text, "do http request"); + assert_eq!(log.trace_context.as_ref().unwrap().span_id, 1); + assert_eq!( + log.tags, + Some(LogTags { + data: vec![KeyStringValuePair { + key: "level".to_string(), + value: "INFO".to_string() + }] + }) + ); + } + body => { + panic!("unknown log body {:?}", body); + } + } + } else if log.service == "consumer" && log.service_instance == "node_0" { + assert_eq!(log.endpoint, "/pong"); + assert_eq!( + log.body.as_ref().unwrap().content, + Some(Content::Json(JsonLog { + json: r#"{"message": "handle pong"}"#.to_string() + })) + ); + assert_eq!(log.trace_context, None); + assert_eq!( + log.tags, + Some(LogTags { + data: vec![KeyStringValuePair { + key: "level".to_string(), + value: "DEBUG".to_string() + }] + }) + ); + } else { + panic!("unknown log {} {}", log.service, log.service_instance); + } +} + +fn create_consumer(topic: &str) -> StreamConsumer { + let consumer: StreamConsumer = ClientConfig::new() + .set("bootstrap.servers", "broker:9092") + .set("broker.address.family", "v4") + .set("session.timeout.ms", "30000") + .set("enable.auto.commit", "true") + .set("auto.offset.reset", "earliest") + .set("enable.auto.offset.store", "true") + .set("group.id", topic) + .create() + .unwrap(); + consumer.subscribe(&[topic]).unwrap(); + consumer +} + +async fn consumer_recv(consumer: &StreamConsumer) -> T { + let message = timeout(Duration::from_secs(12), consumer.recv()) + .await + .unwrap() + .unwrap(); + let value = message.payload_view::<[u8]>().unwrap().unwrap(); + Message::decode(value).unwrap() +} + +#[tokio::main(flavor = "multi_thread")] +async fn main() { + segment().await; + meter().await; + log().await; +} diff --git a/e2e/src/main.rs b/e2e/src/main.rs index 545c1c9..bf46cb3 100644 --- a/e2e/src/main.rs +++ b/e2e/src/main.rs @@ -29,7 +29,11 @@ use skywalking::{ meter::{Counter, Gauge, Histogram}, metricer::Metricer, }, - reporter::grpc::GrpcReporter, + reporter::{ + grpc::GrpcReporter, + kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig}, + CollectItem, Report, + }, trace::{ propagation::{ context::SKYWALKING_HTTP_CONTEXT_HEADER_KEY, decoder::decode_propagation, @@ -40,6 +44,7 @@ use skywalking::{ }; use std::{convert::Infallible, error::Error, net::SocketAddr}; use structopt::StructOpt; +use tokio::sync::mpsc::{UnboundedReceiver, UnboundedSender}; static NOT_FOUND_MSG: &str = "not found"; static SUCCESS_MSG: &str = "Success"; @@ -210,11 +215,52 @@ struct Opt { mode: String, } +#[derive(Clone)] +struct CombineReporter { + grpc_reporter: GrpcReporter, UnboundedReceiver>, + kafka_reporter: KafkaReporter>, +} + +impl Report for CombineReporter { + fn report(&self, item: CollectItem) { + let typ = match &item { + CollectItem::Trace(_) => "trace", + CollectItem::Log(_) => "log", + CollectItem::Meter(_) => "meter", + _ => "unknown", + }; + println!("report item type: {:?}", typ); + self.grpc_reporter.report(item.clone()); + self.kafka_reporter.report(item); + } +} + #[tokio::main] async fn main() -> Result<(), Box> { let opt = Opt::from_args(); - let reporter = GrpcReporter::connect("http://collector:19876").await?; - let handle = reporter.reporting().await.spawn(); + + let reporter1 = GrpcReporter::connect("http://collector:19876").await?; + let handle1 = reporter1.reporting().await.spawn(); + + let mut client_config = RDKafkaClientConfig::new(); + client_config + .set("bootstrap.servers", "broker:9092") + .set("message.timeout.ms", "6000"); + let (reporter2, reporting) = KafkaReportBuilder::new(client_config) + .with_err_handle(|message, err| { + eprintln!( + "kafka reporter failed, message: {}, err: {:?}", + message, err + ); + }) + .build() + .await?; + let handle2 = reporting.spawn(); + + let reporter = CombineReporter { + grpc_reporter: reporter1, + kafka_reporter: reporter2, + }; if opt.mode == "consumer" { tracer::set_global_tracer(Tracer::new("consumer", "node_0", reporter.clone())); @@ -229,7 +275,8 @@ async fn main() -> Result<(), Box> { unreachable!() } - handle.await?; + handle1.await?; + handle2.await?; Ok(()) } diff --git a/src/error/mod.rs b/src/error/mod.rs index 2a26da6..b87a5b7 100644 --- a/src/error/mod.rs +++ b/src/error/mod.rs @@ -44,6 +44,11 @@ pub enum Error { #[error("tokio join failed: {0}")] TokioJoin(#[from] tokio::task::JoinError), + /// Kafka reporter error. + #[cfg(feature = "kafka-reporter")] + #[error("kafka reporter failed: {0}")] + KafkaReporter(crate::reporter::kafka::Error), + /// Other uncovered errors. #[error(transparent)] Other(#[from] Box), diff --git a/src/proto/v3/mod.rs b/src/proto/v3/mod.rs index 0086949..d470966 100644 --- a/src/proto/v3/mod.rs +++ b/src/proto/v3/mod.rs @@ -18,6 +18,7 @@ #![allow(missing_docs)] #![allow(rustdoc::invalid_html_tags)] +#![allow(clippy::derive_partial_eq_without_eq)] use crate::common::system_time::{fetch_time, TimePeriod}; diff --git a/src/reporter/grpc.rs b/src/reporter/grpc.rs index 5abb88f..31676e7 100644 --- a/src/reporter/grpc.rs +++ b/src/reporter/grpc.rs @@ -16,6 +16,7 @@ //! Grpc implementation of [Report]. +use super::{CollectItemConsume, CollectItemProduce}; #[cfg(feature = "management")] use crate::proto::v3::management_service_client::ManagementServiceClient; use crate::{ @@ -51,7 +52,6 @@ use tokio::{ }; use tokio_stream::StreamExt; use tonic::{ - async_trait, metadata::{Ascii, MetadataValue}, service::{interceptor::InterceptedService, Interceptor}, transport::{self, Channel, Endpoint}, @@ -71,66 +71,6 @@ fn default_status_handle(message: &str, status: &Status) { error!(?status, "{}", message); } -/// Special purpose, used for user-defined production operations. Generally, it -/// does not need to be handled. -pub trait CollectItemProduce: Send + Sync + 'static { - /// Produce the collect item non-blocking. - fn produce(&self, item: CollectItem) -> Result<(), Box>; -} - -impl CollectItemProduce for () { - fn produce(&self, _item: CollectItem) -> Result<(), Box> { - Ok(()) - } -} - -impl CollectItemProduce for mpsc::UnboundedSender { - fn produce(&self, item: CollectItem) -> Result<(), Box> { - Ok(self.send(item)?) - } -} - -/// Special purpose, used for user-defined consume operations. Generally, it -/// does not need to be handled. -#[async_trait] -pub trait CollectItemConsume: Send + Sync + 'static { - /// Consume the collect item blocking. - async fn consume(&mut self) -> Result, Box>; - - /// Try to consume the collect item non-blocking. - async fn try_consume(&mut self) -> Result, Box>; -} - -#[async_trait] -impl CollectItemConsume for () { - async fn consume(&mut self) -> Result, Box> { - Ok(None) - } - - async fn try_consume(&mut self) -> Result, Box> { - Ok(None) - } -} - -#[async_trait] -impl CollectItemConsume for mpsc::UnboundedReceiver { - async fn consume(&mut self) -> Result, Box> { - Ok(self.recv().await) - } - - async fn try_consume(&mut self) -> Result, Box> { - use mpsc::error::TryRecvError; - - match self.try_recv() { - Ok(item) => Ok(Some(item)), - Err(e) => match e { - TryRecvError::Empty => Ok(None), - TryRecvError::Disconnected => Err(Box::new(e)), - }, - } - } -} - #[derive(Default, Clone)] struct CustomInterceptor { authentication: Option>, diff --git a/src/reporter/kafka.rs b/src/reporter/kafka.rs new file mode 100644 index 0000000..aff55ac --- /dev/null +++ b/src/reporter/kafka.rs @@ -0,0 +1,348 @@ +// Licensed to the Apache Software Foundation (ASF) under one or more +// contributor license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright ownership. +// The ASF licenses this file to You under the Apache License, Version 2.0 +// (the "License"); you may not use this file except in compliance with +// the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// + +//! Kafka implementation of [Report]. + +use super::{CollectItemConsume, CollectItemProduce}; +use crate::reporter::{CollectItem, Report}; +pub use rdkafka::config::{ClientConfig as RDKafkaClientConfig, RDKafkaLogLevel}; +use rdkafka::producer::{FutureProducer, FutureRecord}; +use std::{ + error, + future::{pending, Future}, + pin::Pin, + sync::{ + atomic::{AtomicBool, Ordering::Relaxed}, + Arc, + }, + time::Duration, +}; +use tokio::{select, spawn, sync::mpsc, task::JoinHandle, try_join}; +use tracing::error; + +/// Kafka reporter error. +#[derive(Debug, thiserror::Error)] +pub enum Error { + /// ksKafka error. + #[error(transparent)] + RdKafka(#[from] rdkafka::error::KafkaError), + + /// kafka topic not found + #[error("topic not found: {topic}")] + TopicNotFound { + /// Name of kafka topic. + topic: String, + }, +} + +type DynErrHandler = dyn Fn(&str, &dyn error::Error) + Send + Sync + 'static; + +fn default_err_handle(message: &str, err: &dyn error::Error) { + error!(?err, "{}", message); +} + +#[derive(Default)] +struct State { + is_closing: AtomicBool, +} + +impl State { + fn is_closing(&self) -> bool { + self.is_closing.load(Relaxed) + } +} + +/// The Kafka reporter plugin support report traces, metrics, logs, instance +/// properties to Kafka cluster. +pub struct KafkaReportBuilder { + state: Arc, + producer: Arc

, + consumer: C, + client_config: RDKafkaClientConfig, + namespace: Option, + err_handle: Arc, +} + +impl KafkaReportBuilder, mpsc::UnboundedReceiver> { + /// Create builder, with rdkafka client configuration. + pub fn new(client_config: RDKafkaClientConfig) -> Self { + let (producer, consumer) = mpsc::unbounded_channel(); + Self::new_with_pc(client_config, producer, consumer) + } +} + +impl KafkaReportBuilder { + /// Special purpose, used for user-defined produce and consume operations, + /// usually you can use [KafkaReportBuilder::new]. + pub fn new_with_pc(client_config: RDKafkaClientConfig, producer: P, consumer: C) -> Self { + Self { + state: Default::default(), + producer: Arc::new(producer), + consumer, + client_config, + namespace: None, + err_handle: Arc::new(default_err_handle), + } + } + + /// Set error handle. By default, the error will be logged. + pub fn with_err_handle( + mut self, + handle: impl Fn(&str, &dyn error::Error) + Send + Sync + 'static, + ) -> Self { + self.err_handle = Arc::new(handle); + self + } + + /// Use to isolate multi OAP server when using same Kafka cluster (final + /// topic name will append namespace before Kafka topics with - ). + pub fn with_namespace(mut self, namespace: impl Into) -> Self { + self.namespace = Some(namespace.into()); + self + } + + /// Build the Reporter implemented [Report] in the foreground, and the + /// handle to push data to kafka in the background. + pub async fn build(self) -> Result<(KafkaReporter

, KafkaReporting), Error> { + let kafka_producer = KafkaProducer::new( + self.client_config.create()?, + self.err_handle.clone(), + self.namespace, + ) + .await?; + Ok(( + KafkaReporter { + state: self.state.clone(), + producer: self.producer, + err_handle: self.err_handle, + }, + KafkaReporting { + state: self.state, + consumer: self.consumer, + kafka_producer, + shutdown_signal: Box::pin(pending()), + }, + )) + } +} + +/// The kafka reporter implemented [Report]. +pub struct KafkaReporter

{ + state: Arc, + producer: Arc

, + err_handle: Arc, +} + +impl

Clone for KafkaReporter

{ + #[inline] + fn clone(&self) -> Self { + Self { + state: self.state.clone(), + producer: self.producer.clone(), + err_handle: self.err_handle.clone(), + } + } +} + +impl Report for KafkaReporter

{ + fn report(&self, item: CollectItem) { + if !self.state.is_closing() { + if let Err(e) = self.producer.produce(item) { + (self.err_handle)("report collect item failed", &*e); + } + } + } +} + +/// The handle to push data to kafka. +pub struct KafkaReporting { + state: Arc, + consumer: C, + kafka_producer: KafkaProducer, + shutdown_signal: Pin + Send + Sync + 'static>>, +} + +impl KafkaReporting { + /// Quit when shutdown_signal received. + /// + /// Accept a `shutdown_signal` argument as a graceful shutdown signal. + pub fn with_graceful_shutdown( + mut self, + shutdown_signal: impl Future + Send + Sync + 'static, + ) -> Self { + self.shutdown_signal = Box::pin(shutdown_signal); + self + } + + /// Spawn the reporting in background. + pub fn spawn(self) -> ReportingJoinHandle { + let handle = spawn(async move { + let KafkaReporting { + state, + mut consumer, + mut kafka_producer, + shutdown_signal, + } = self; + + let (shutdown_tx, mut shutdown_rx) = mpsc::unbounded_channel(); + + let work_fut = async move { + loop { + select! { + item = consumer.consume() => { + match item { + Ok(Some(item)) => { + kafka_producer.produce(item).await; + } + Ok(None) => break, + Err(err) => return Err(crate::Error::Other(err)), + } + } + _ = shutdown_rx.recv() => break, + } + } + + state.is_closing.store(true, Relaxed); + + // Flush. + loop { + match consumer.try_consume().await { + Ok(Some(item)) => { + kafka_producer.produce(item).await; + } + Ok(None) => break, + Err(err) => return Err(err.into()), + } + } + + Ok::<_, crate::Error>(()) + }; + + let shutdown_fut = async move { + shutdown_signal.await; + shutdown_tx + .send(()) + .map_err(|e| crate::Error::Other(Box::new(e)))?; + Ok(()) + }; + + try_join!(work_fut, shutdown_fut)?; + + Ok(()) + }); + ReportingJoinHandle { handle } + } +} + +/// Handle of [KafkaReporting::spawn]. +pub struct ReportingJoinHandle { + handle: JoinHandle>, +} + +impl Future for ReportingJoinHandle { + type Output = crate::Result<()>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + Pin::new(&mut self.handle).poll(cx).map(|rs| rs?) + } +} + +struct TopicNames { + segment: String, + meter: String, + log: String, + #[cfg(feature = "management")] + management: String, +} + +impl TopicNames { + const TOPIC_LOG: &str = "skywalking-logs"; + #[cfg(feature = "management")] + const TOPIC_MANAGEMENT: &str = "skywalking-managements"; + const TOPIC_METER: &str = "skywalking-meters"; + const TOPIC_SEGMENT: &str = "skywalking-segments"; + + fn new(namespace: Option<&str>) -> Self { + Self { + segment: Self::real_topic_name(namespace, Self::TOPIC_SEGMENT), + meter: Self::real_topic_name(namespace, Self::TOPIC_METER), + log: Self::real_topic_name(namespace, Self::TOPIC_LOG), + #[cfg(feature = "management")] + management: Self::real_topic_name(namespace, Self::TOPIC_MANAGEMENT), + } + } + + fn real_topic_name(namespace: Option<&str>, topic_name: &str) -> String { + namespace + .map(|namespace| format!("{}-{}", namespace, topic_name)) + .unwrap_or_else(|| topic_name.to_string()) + } +} + +struct KafkaProducer { + topic_names: TopicNames, + client: FutureProducer, + err_handle: Arc, +} + +impl KafkaProducer { + async fn new( + client: FutureProducer, + err_handle: Arc, + namespace: Option, + ) -> Result { + let topic_names = TopicNames::new(namespace.as_deref()); + Ok(Self { + client, + err_handle, + topic_names, + }) + } + + async fn produce(&mut self, item: CollectItem) { + let (topic_name, key) = match &item { + CollectItem::Trace(item) => ( + &self.topic_names.segment, + item.trace_segment_id.as_bytes().to_vec(), + ), + CollectItem::Log(item) => (&self.topic_names.log, item.service.as_bytes().to_vec()), + CollectItem::Meter(item) => ( + &self.topic_names.meter, + item.service_instance.as_bytes().to_vec(), + ), + #[cfg(feature = "management")] + CollectItem::Instance(item) => ( + &self.topic_names.management, + format!("register-{}", &item.service_instance).into_bytes(), + ), + #[cfg(feature = "management")] + CollectItem::Ping(item) => ( + &self.topic_names.log, + item.service_instance.as_bytes().to_vec(), + ), + }; + + let payload = item.encode_to_vec(); + let record = FutureRecord::to(topic_name).payload(&payload).key(&key); + + if let Err((err, _)) = self.client.send(record, Duration::from_secs(0)).await { + (self.err_handle)("Collect data to kafka failed", &err); + } + } +} diff --git a/src/reporter/mod.rs b/src/reporter/mod.rs index 0b46662..783fcfe 100644 --- a/src/reporter/mod.rs +++ b/src/reporter/mod.rs @@ -17,17 +17,21 @@ //! Reporter contains common `Report` trait and the implementations. pub mod grpc; +#[cfg(feature = "kafka-reporter")] +#[cfg_attr(docsrs, doc(cfg(feature = "kafka-reporter")))] +pub mod kafka; pub mod print; #[cfg(feature = "management")] use crate::proto::v3::{InstancePingPkg, InstanceProperties}; use crate::proto::v3::{LogData, MeterData, SegmentObject}; use serde::{Deserialize, Serialize}; -use std::{ops::Deref, sync::Arc}; -use tokio::sync::OnceCell; +use std::{error::Error, ops::Deref, sync::Arc}; +use tokio::sync::{mpsc, OnceCell}; +use tonic::async_trait; /// Collect item of protobuf object. -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] #[non_exhaustive] pub enum CollectItem { /// Tracing object. @@ -46,6 +50,23 @@ pub enum CollectItem { Ping(Box), } +impl CollectItem { + #[cfg(feature = "kafka-reporter")] + pub(crate) fn encode_to_vec(self) -> Vec { + use prost::Message; + + match self { + CollectItem::Trace(item) => item.encode_to_vec(), + CollectItem::Log(item) => item.encode_to_vec(), + CollectItem::Meter(item) => item.encode_to_vec(), + #[cfg(feature = "management")] + CollectItem::Instance(item) => item.encode_to_vec(), + #[cfg(feature = "management")] + CollectItem::Ping(item) => item.encode_to_vec(), + } + } +} + pub(crate) type DynReport = dyn Report + Send + Sync + 'static; /// Report provide non-blocking report method for trace, log and metric object. @@ -76,3 +97,91 @@ impl Report for OnceCell { Report::report(self.get().expect("OnceCell is empty"), item) } } + +/// Special purpose, used for user-defined production operations. Generally, it +/// does not need to be handled. +pub trait CollectItemProduce: Send + Sync + 'static { + /// Produce the collect item non-blocking. + fn produce(&self, item: CollectItem) -> Result<(), Box>; +} + +impl CollectItemProduce for () { + fn produce(&self, _item: CollectItem) -> Result<(), Box> { + Ok(()) + } +} + +impl CollectItemProduce for mpsc::Sender { + fn produce(&self, item: CollectItem) -> Result<(), Box> { + Ok(self.blocking_send(item)?) + } +} + +impl CollectItemProduce for mpsc::UnboundedSender { + fn produce(&self, item: CollectItem) -> Result<(), Box> { + Ok(self.send(item)?) + } +} + +/// Alias of method result of [CollectItemConsume]. +pub type ConsumeResult = Result, Box>; + +/// Special purpose, used for user-defined consume operations. Generally, it +/// does not need to be handled. +#[async_trait] +pub trait CollectItemConsume: Send + Sync + 'static { + /// Consume the collect item blocking. + async fn consume(&mut self) -> ConsumeResult; + + /// Try to consume the collect item non-blocking. + async fn try_consume(&mut self) -> ConsumeResult; +} + +#[async_trait] +impl CollectItemConsume for () { + async fn consume(&mut self) -> ConsumeResult { + Ok(None) + } + + async fn try_consume(&mut self) -> ConsumeResult { + Ok(None) + } +} + +#[async_trait] +impl CollectItemConsume for mpsc::Receiver { + async fn consume(&mut self) -> ConsumeResult { + Ok(self.recv().await) + } + + async fn try_consume(&mut self) -> ConsumeResult { + use mpsc::error::TryRecvError; + + match self.try_recv() { + Ok(item) => Ok(Some(item)), + Err(e) => match e { + TryRecvError::Empty => Ok(None), + TryRecvError::Disconnected => Err(Box::new(e)), + }, + } + } +} + +#[async_trait] +impl CollectItemConsume for mpsc::UnboundedReceiver { + async fn consume(&mut self) -> ConsumeResult { + Ok(self.recv().await) + } + + async fn try_consume(&mut self) -> ConsumeResult { + use mpsc::error::TryRecvError; + + match self.try_recv() { + Ok(item) => Ok(Some(item)), + Err(e) => match e { + TryRecvError::Empty => Ok(None), + TryRecvError::Disconnected => Err(Box::new(e)), + }, + } + } +}