Skip to content

Commit

Permalink
Implement Send + Sync on BorrowedMessage
Browse files Browse the repository at this point in the history
librdkafka is entirely thread safe, and guarantees that the memory
referenced by a BorrowedMessage will remain valid as long as the
consumer remains valid. Since BorrowedMessage has a reference to the
consumer inside, Rust will enforce this invariant, and it is therefore
safe to mark BorrowedMessage as Send + Sync.

Also update the asynchronous_processing example to make use of this new
feature. The example now allows spawning multiple workers, which
requires tokio::spawn, which turn requires that the generated future
implement Send, which finally requires the adjustment made in this
patch.

Fix fede1024#85.
Fix fede1024#189.
  • Loading branch information
benesch committed Dec 9, 2019
1 parent 4b51170 commit 2779f77
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 37 deletions.
107 changes: 70 additions & 37 deletions examples/asynchronous_processing.rs
@@ -1,21 +1,28 @@
use std::thread;
use std::time::Duration;

use clap::{App, Arg};
use futures::{future, TryStreamExt};
use log::{info, warn};
use clap::{App, Arg, value_t};
use futures::{StreamExt, TryStreamExt};
use futures::stream::FuturesUnordered;
use log::info;

use rdkafka::config::ClientConfig;
use rdkafka::consumer::stream_consumer::StreamConsumer;
use rdkafka::consumer::Consumer;
use rdkafka::message::OwnedMessage;
use rdkafka::message::{BorrowedMessage, OwnedMessage};
use rdkafka::producer::{FutureProducer, FutureRecord};
use rdkafka::Message;

use crate::example_utils::setup_logger;

mod example_utils;

async fn record_message_receipt(msg: &BorrowedMessage<'_>) {
// Simulate some work that must be done in the same order as messages are
// received; i.e., before truly parallel processing can begin.
info!("Message received: {}", msg.offset());
}

// Emulates an expensive, synchronous computation.
fn expensive_computation<'a>(msg: OwnedMessage) -> String {
info!("Starting expensive computation on message {}", msg.offset());
Expand All @@ -39,58 +46,65 @@ fn expensive_computation<'a>(msg: OwnedMessage) -> String {
// `tokio::spawn` is used to handle IO-bound tasks in parallel (e.g., producing
// the messages), while `tokio::task::spawn_blocking` is used to handle the
// simulated CPU-bound task.
async fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_topic: &str) {
async fn run_async_processor(
brokers: String,
group_id: String,
input_topic: String,
output_topic: String,
) {
// Create the `StreamConsumer`, to receive the messages from the topic in form of a `Stream`.
let consumer: StreamConsumer = ClientConfig::new()
.set("group.id", group_id)
.set("bootstrap.servers", brokers)
.set("group.id", &group_id)
.set("bootstrap.servers", &brokers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "false")
.create()
.expect("Consumer creation failed");

consumer
.subscribe(&[input_topic])
.subscribe(&[&input_topic])
.expect("Can't subscribe to specified topic");

// Create the `FutureProducer` to produce asynchronously.
let producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("bootstrap.servers", &brokers)
.set("message.timeout.ms", "5000")
.create()
.expect("Producer creation error");

// Create the outer pipeline on the message stream.
let stream_processor = consumer.start().try_for_each(|borrowed_message| {
// Process each message
info!("Message received: {}", borrowed_message.offset());
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
let output_topic = output_topic.to_string();
let producer = producer.clone();
tokio::spawn(async move {
// The body of this block will be executed on the main thread pool,
// but we perform `expensive_computation` on a separate thread pool
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
let computation_result =
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
.await
.expect("failed to wait for expensive computation");
let produce_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
0,
);
match produce_future.await {
Ok(Ok(delivery)) => println!("Sent: {:?}", delivery),
Ok(Err((e, _))) => println!("Error: {:?}", e),
Err(_) => println!("Future cancelled"),
}
});
future::ready(Ok(()))
let output_topic = output_topic.to_string();
async move {
// Process each message
record_message_receipt(&borrowed_message).await;
// Borrowed messages can't outlive the consumer they are received from, so they need to
// be owned in order to be sent to a separate thread.
let owned_message = borrowed_message.detach();
tokio::spawn(async move {
// The body of this block will be executed on the main thread pool,
// but we perform `expensive_computation` on a separate thread pool
// for CPU-intensive tasks via `tokio::task::spawn_blocking`.
let computation_result =
tokio::task::spawn_blocking(|| expensive_computation(owned_message))
.await
.expect("failed to wait for expensive computation");
let produce_future = producer.send(
FutureRecord::to(&output_topic)
.key("some key")
.payload(&computation_result),
0,
);
match produce_future.await {
Ok(Ok(delivery)) => println!("Sent: {:?}", delivery),
Ok(Err((e, _))) => println!("Error: {:?}", e),
Err(_) => println!("Future cancelled"),
}
});
Ok(())
}
});

info!("Starting event loop");
Expand Down Expand Up @@ -139,6 +153,13 @@ async fn main() {
.takes_value(true)
.required(true),
)
.arg(
Arg::with_name("num-workers")
.long("num-workers")
.help("Number of workers")
.takes_value(true)
.default_value("1"),
)
.get_matches();

setup_logger(true, matches.value_of("log-conf"));
Expand All @@ -147,6 +168,18 @@ async fn main() {
let group_id = matches.value_of("group-id").unwrap();
let input_topic = matches.value_of("input-topic").unwrap();
let output_topic = matches.value_of("output-topic").unwrap();
let num_workers = value_t!(matches, "num-workers", usize).unwrap();

run_async_processor(brokers, group_id, input_topic, output_topic).await
(0..num_workers)
.map(|_| {
tokio::spawn(run_async_processor(
brokers.to_owned(),
group_id.to_owned(),
input_topic.to_owned(),
output_topic.to_owned(),
))
})
.collect::<FuturesUnordered<_>>()
.for_each(|_| async { () })
.await
}
3 changes: 3 additions & 0 deletions src/message.rs
Expand Up @@ -362,6 +362,9 @@ impl<'a> Drop for BorrowedMessage<'a> {
}
}

unsafe impl<'a> Send for BorrowedMessage<'a> {}
unsafe impl<'a> Sync for BorrowedMessage<'a> {}

//
// ********** OWNED MESSAGE **********
//
Expand Down

0 comments on commit 2779f77

Please sign in to comment.