Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,14 +226,14 @@ Make sure the feature `kafka-reporter` is enabled.
#[cfg(feature = "kafka-reporter")]
mod example {
use skywalking::reporter::Report;
use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig};
use skywalking::reporter::kafka::{KafkaReportBuilder, KafkaReporter, ClientConfig};

async fn do_something(reporter: &impl Report) {
// ....
}

async fn foo() {
let mut client_config = RDKafkaClientConfig::new();
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", "broker:9092")
.set("message.timeout.ms", "6000");
Expand Down
4 changes: 2 additions & 2 deletions e2e/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ use skywalking::{
reporter::{
CollectItem, Report,
grpc::GrpcReporter,
kafka::{KafkaReportBuilder, KafkaReporter, RDKafkaClientConfig},
kafka::{ClientConfig, KafkaReportBuilder, KafkaReporter},
},
trace::{
propagation::{
Expand Down Expand Up @@ -252,7 +252,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let reporter1 = GrpcReporter::connect("http://127.0.0.1:19876").await?;
let handle1 = reporter1.reporting().await.spawn();

let mut client_config = RDKafkaClientConfig::new();
let mut client_config = ClientConfig::new();
client_config
.set("bootstrap.servers", "127.0.0.1:9092")
.set("message.timeout.ms", "6000")
Expand Down
100 changes: 93 additions & 7 deletions src/reporter/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@

use super::{CollectItemConsume, CollectItemProduce};
use crate::reporter::{CollectItem, Report};
pub use rdkafka::config::{ClientConfig as RDKafkaClientConfig, RDKafkaLogLevel};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::{
config::ClientConfig as RDKafkaClientConfig,
producer::{FutureProducer, FutureRecord},
};
use std::{
collections::HashMap,
error,
future::{Future, pending},
pin::Pin,
Expand Down Expand Up @@ -48,6 +51,89 @@ pub enum Error {
},
}

/// Log level for Kafka client.
#[derive(Debug, Clone, Copy)]
pub enum LogLevel {
/// Critical level.
Critical,
/// Error level.
Error,
/// Warning level.
Warning,
/// Notice level.
Notice,
/// Info level.
Info,
/// Debug level.
Debug,
}

impl From<LogLevel> for rdkafka::config::RDKafkaLogLevel {
fn from(level: LogLevel) -> Self {
match level {
LogLevel::Critical => rdkafka::config::RDKafkaLogLevel::Critical,
LogLevel::Error => rdkafka::config::RDKafkaLogLevel::Error,
LogLevel::Warning => rdkafka::config::RDKafkaLogLevel::Warning,
LogLevel::Notice => rdkafka::config::RDKafkaLogLevel::Notice,
LogLevel::Info => rdkafka::config::RDKafkaLogLevel::Info,
LogLevel::Debug => rdkafka::config::RDKafkaLogLevel::Debug,
}
}
}

/// Configuration for Kafka client.
#[derive(Debug, Clone)]
pub struct ClientConfig {
/// Configuration parameters as key-value pairs.
params: HashMap<String, String>,
/// Log level for the client.
log_level: Option<LogLevel>,
}

impl ClientConfig {
/// Create a new empty configuration.
pub fn new() -> Self {
Self {
params: HashMap::new(),
log_level: None,
}
}

/// Set a configuration parameter.
pub fn set<K, V>(&mut self, key: K, value: V) -> &mut Self
where
K: Into<String>,
V: Into<String>,
{
self.params.insert(key.into(), value.into());
self
}

/// Set log level.
pub fn set_log_level(&mut self, level: LogLevel) -> &mut Self {
self.log_level = Some(level);
self
}

/// Convert to rdkafka ClientConfig.
fn to_rdkafka_config(&self) -> RDKafkaClientConfig {
let mut config = RDKafkaClientConfig::new();
for (key, value) in &self.params {
config.set(key, value);
}
if let Some(log_level) = self.log_level {
config.set_log_level(log_level.into());
}
config
}
}

impl Default for ClientConfig {
fn default() -> Self {
Self::new()
}
}

type DynErrHandler = dyn Fn(&str, &dyn error::Error) + Send + Sync + 'static;

fn default_err_handle(message: &str, err: &dyn error::Error) {
Expand All @@ -71,14 +157,14 @@ pub struct KafkaReportBuilder<P, C> {
state: Arc<State>,
producer: Arc<P>,
consumer: C,
client_config: RDKafkaClientConfig,
client_config: ClientConfig,
namespace: Option<String>,
err_handle: Arc<DynErrHandler>,
}

impl KafkaReportBuilder<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedReceiver<CollectItem>> {
/// Create builder, with rdkafka client configuration.
pub fn new(client_config: RDKafkaClientConfig) -> Self {
/// Create builder, with client configuration.
pub fn new(client_config: ClientConfig) -> Self {
let (producer, consumer) = mpsc::unbounded_channel();
Self::new_with_pc(client_config, producer, consumer)
}
Expand All @@ -87,7 +173,7 @@ impl KafkaReportBuilder<mpsc::UnboundedSender<CollectItem>, mpsc::UnboundedRecei
impl<P: CollectItemProduce, C: CollectItemConsume> KafkaReportBuilder<P, C> {
/// Special purpose, used for user-defined produce and consume operations,
/// usually you can use [KafkaReportBuilder::new].
pub fn new_with_pc(client_config: RDKafkaClientConfig, producer: P, consumer: C) -> Self {
pub fn new_with_pc(client_config: ClientConfig, producer: P, consumer: C) -> Self {
Self {
state: Default::default(),
producer: Arc::new(producer),
Expand Down Expand Up @@ -118,7 +204,7 @@ impl<P: CollectItemProduce, C: CollectItemConsume> KafkaReportBuilder<P, C> {
/// handle to push data to kafka in the background.
pub async fn build(self) -> Result<(KafkaReporter<P>, KafkaReporting<C>), Error> {
let kafka_producer = KafkaProducer::new(
self.client_config.create()?,
self.client_config.to_rdkafka_config().create()?,
self.err_handle.clone(),
self.namespace,
)
Expand Down
Loading