Skip to content

Commit

Permalink
Merge pull request #53 from wrl/master
Browse files Browse the repository at this point in the history
Directly return a DeliveryFuture from FutureProducer::send_copy
  • Loading branch information
fede1024 committed Jul 2, 2017
2 parents b7e51eb + 9ef36cc commit 1c2f5c4
Show file tree
Hide file tree
Showing 6 changed files with 40 additions and 19 deletions.
2 changes: 1 addition & 1 deletion examples/asynchronous_processing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ fn run_async_processor(brokers: &str, group_id: &str, input_topic: &str, output_
}).and_then(move |computation_result| {
// Send the result of the computation to Kafka, asynchronously.
info!("Sending result");
producer.send_copy::<String, ()>(&topic_name, None, Some(&computation_result), None, None).unwrap()
producer.send_copy::<String, ()>(&topic_name, None, Some(&computation_result), None, None)
}).and_then(|d_report| {
// Once the message has been produced, print the delivery report and terminate
// the pipeline.
Expand Down
3 changes: 1 addition & 2 deletions examples/at_least_once.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,8 +148,7 @@ fn main() {
join_all(
output_topics.iter()
.map(|output_topic|
producer.send_copy(output_topic, None, m.payload(), m.key(), None) // TODO: fix timestamp
.expect("Failed to produce message")))
producer.send_copy(output_topic, None, m.payload(), m.key(), None))) // TODO: fix timestamp
.wait()
.expect("Message delivery failed for some topic");
// Now that the message is completely processed, add it's position to the offset
Expand Down
1 change: 0 additions & 1 deletion examples/simple_producer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ fn produce(brokers: &str, topic_name: &str) {
// The send operation on the topic returns a future, that will be completed once the
// result or failure from Kafka will be received.
producer.send_copy(topic_name, None, Some(&value), Some(&vec![0, 1, 2, 3]), None)
.expect("Production failed")
.map(move |delivery_status| { // This will be executed onw the result is received
info!("Delivery status for message {} received", i);
delivery_status
Expand Down
7 changes: 6 additions & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ pub enum KafkaError {
TopicConfig(RDKafkaConfRes, String, String, String),
TopicCreation(String),
Global(RDKafkaError),
FutureCanceled
}

impl fmt::Debug for KafkaError {
Expand All @@ -68,6 +69,7 @@ impl fmt::Debug for KafkaError {
KafkaError::TopicConfig(_, ref desc, ref key, ref value) => write!(f, "KafkaError (Topic config error: {} {} {})", desc, key, value),
KafkaError::TopicCreation(ref err) => write!(f, "KafkaError (Topic creation error: {})", err),
KafkaError::Global(err) => write!(f, "KafkaError (Global error: {})", err),
KafkaError::FutureCanceled => write!(f, "KafkaError (Future canceled)")
}
}
}
Expand All @@ -92,6 +94,7 @@ impl fmt::Display for KafkaError {
KafkaError::TopicConfig(_, ref desc, ref key, ref value) => write!(f, "Topic config error: {} {} {}", desc, key, value),
KafkaError::TopicCreation(ref err) => write!(f, "Topic creation error: {}", err),
KafkaError::Global(err) => write!(f, "Global error: {}", err),
KafkaError::FutureCanceled => write!(f, "Future canceled")
}
}
}
Expand All @@ -116,6 +119,7 @@ impl error::Error for KafkaError {
KafkaError::TopicConfig(_, _, _, _) => "Topic config error",
KafkaError::TopicCreation(_) => "Topic creation error",
KafkaError::Global(_) => "Global error",
KafkaError::FutureCanceled => "Future canceled"
}
}

Expand All @@ -137,7 +141,8 @@ impl error::Error for KafkaError {
KafkaError::Subscription(_) => None,
KafkaError::TopicConfig(_, _, _, _) => None,
KafkaError::TopicCreation(_) => None,
KafkaError::Global(ref err) => Some(err)
KafkaError::Global(ref err) => Some(err),
KafkaError::FutureCanceled => None
}
}
}
Expand Down
43 changes: 31 additions & 12 deletions src/producer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Producer implementations.
use futures::{self, Canceled, Complete, Future, Poll, Oneshot};
use futures::{self, Canceled, Complete, Future, Poll, Oneshot, Async};
use rdsys::rd_kafka_vtype_t::*;
use rdsys::types::*;
use rdsys;
Expand Down Expand Up @@ -225,10 +225,10 @@ impl<C: Context + 'static> Context for FutureProducerContext<C> {
}

impl<C: Context + 'static> ProducerContext for FutureProducerContext<C> {
type DeliveryContext = Complete<DeliveryReport>;
type DeliveryContext = Complete<KafkaResult<DeliveryReport>>;

fn delivery(&self, status: DeliveryReport, tx: Complete<DeliveryReport>) {
tx.complete(status);
fn delivery(&self, status: DeliveryReport, tx: Complete<KafkaResult<DeliveryReport>>) {
let _ = tx.send(Ok(status));
}
}

Expand Down Expand Up @@ -296,12 +296,19 @@ impl<C: Context + 'static> _FutureProducer<C> {
payload: Option<&P>,
key: Option<&K>,
timestamp: Option<i64>
) -> KafkaResult<DeliveryFuture>
) -> DeliveryFuture
where K: ToBytes + ?Sized,
P: ToBytes + ?Sized {
let (tx, rx) = futures::oneshot();
self.producer.send_copy(topic_name, partition, payload, key, Some(Box::new(tx)), timestamp)?;
Ok(DeliveryFuture{rx: rx})

match self.producer.send_copy(topic_name, partition, payload, key, Some(Box::new(tx)), timestamp) {
Ok(_) => DeliveryFuture{ rx },
Err(e) => {
let (tx, rx) = futures::oneshot();
let _ = tx.send(Err(e));
DeliveryFuture { rx }
}
}
}
}

Expand Down Expand Up @@ -351,15 +358,27 @@ impl<C: Context + 'static> FromClientConfigAndContext<C> for FutureProducer<C> {
/// A future that will receive a `DeliveryReport` containing information on the
/// delivery status of the message.
pub struct DeliveryFuture {
rx: Oneshot<DeliveryReport>,
rx: Oneshot<KafkaResult<DeliveryReport>>,
}

impl DeliveryFuture {
pub fn close(&mut self) {
self.rx.close();
}
}

impl Future for DeliveryFuture {
type Item = DeliveryReport;
type Error = Canceled;
type Error = KafkaError;

fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
match self.rx.poll() {
Ok(Async::NotReady) => Ok(Async::NotReady),
Err(Canceled) => Err(KafkaError::FutureCanceled),

fn poll(&mut self) -> Poll<DeliveryReport, Canceled> {
self.rx.poll()
Ok(Async::Ready(Ok(delivery_report))) => Ok(Async::Ready(delivery_report)),
Ok(Async::Ready(Err(e))) => Err(e)
}
}
}

Expand All @@ -374,7 +393,7 @@ impl<C: Context + 'static> FutureProducer<C> {
payload: Option<&P>,
key: Option<&K>,
timestamp: Option<i64>
) -> KafkaResult<DeliveryFuture>
) -> DeliveryFuture
where K: ToBytes + ?Sized,
P: ToBytes + ?Sized {
self.inner.send_copy(topic_name, partition, payload, key, timestamp)
Expand Down
3 changes: 1 addition & 2 deletions tests/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,7 @@ pub fn produce_messages<P, K, J, Q>(topic_name: &str, count: i32, value_fn: &P,

let futures = (0..count)
.map(|id| {
let future = producer.send_copy(topic_name, partition, Some(&value_fn(id)), Some(&key_fn(id)), timestamp)
.expect("Production failed");
let future = producer.send_copy(topic_name, partition, Some(&value_fn(id)), Some(&key_fn(id)), timestamp);
(id, future)
}).collect::<Vec<_>>();

Expand Down

0 comments on commit 1c2f5c4

Please sign in to comment.