Skip to content
Permalink
Browse files

initial kafka check-in

  • Loading branch information...
frankmcsherry committed Aug 28, 2017
1 parent 22c35a7 commit e4d59aef36bd6fa39701aed55e82ecd44890619d
Showing with 242 additions and 1 deletion.
  1. +2 −1 .gitignore
  2. +3 −0 kafkaesque/.gitignore
  3. +14 −0 kafkaesque/Cargo.toml
  4. +74 −0 kafkaesque/src/kafka_event.rs
  5. +149 −0 kafkaesque/src/main.rs
@@ -1,2 +1,3 @@
/target
Cargo.lock
/.vscode
Cargo.lock
@@ -0,0 +1,3 @@
/target
/.vscode
Cargo.lock
@@ -0,0 +1,14 @@
[package]
name = "kafkaesque"
version = "0.1.0"
authors = ["Frank McSherry <fmcsherry@me.com>"]

[dependencies]
clap="*"
abomonation="0.4.5"

[dependencies.rdkafka]
version = "^0.12.0"

[dependencies.timely]
git = "https://github.com/frankmcsherry/timely-dataflow"
@@ -0,0 +1,74 @@
use abomonation::Abomonation;
use timely::dataflow::operators::capture::event::{Event, EventPusher, EventIterator};

use rdkafka::Message;
use rdkafka::producer::{BaseProducer, ProducerContext};
use rdkafka::consumer::{BaseConsumer, ConsumerContext};


/// A wrapper for `W: Write` implementing `EventPusher<T, D>`.
pub struct EventProducer<C: ProducerContext, T, D> {
topic: String,
buffer: Vec<u8>,
producer: BaseProducer<C>,
phant: ::std::marker::PhantomData<(T,D)>,
}

impl<C: ProducerContext, T, D> EventProducer<C, T, D> {
/// Allocates a new `EventWriter` wrapping a supplied writer.
pub fn new(p: BaseProducer<C>, topic: String) -> Self {
EventProducer {
topic: topic,
buffer: vec![],
producer: p,
phant: ::std::marker::PhantomData,
}
}
}

impl<C: ProducerContext, T: Abomonation, D: Abomonation> EventPusher<T, D> for EventProducer<C, T, D> {
fn push(&mut self, event: Event<T, D>) {
unsafe { ::abomonation::encode(&event, &mut self.buffer); }
self.producer.send_copy::<[u8],()>(self.topic.as_str(), None, Some(&self.buffer[..]), None, None, None).unwrap();
self.buffer.clear();
}
}


/// A Wrapper for `R: Read` implementing `EventIterator<T, D>`.
pub struct EventConsumer<C: ConsumerContext, T, D> {
// topic: String,
consumer: BaseConsumer<C>,
buffer: Vec<u8>,
phant: ::std::marker::PhantomData<(T,D)>,
}

impl<C: ConsumerContext, T, D> EventConsumer<C, T, D> {
/// Allocates a new `EventReader` wrapping a supplied reader.
pub fn new(c: BaseConsumer<C>) -> Self {
EventConsumer {
consumer: c,
buffer: Vec::new(),
phant: ::std::marker::PhantomData,
}
}
}

impl<C: ConsumerContext, T: Abomonation, D: Abomonation> EventIterator<T, D> for EventConsumer<C, T, D> {
fn next(&mut self) -> Option<&Event<T, D>> {

let buffer = &mut self.buffer;
match self.consumer.poll(0) {
Ok(result) => {
result.map(move |message| {
buffer.extend_from_slice(message.payload().unwrap());
unsafe { ::abomonation::decode::<Event<T,D>>(&mut buffer[..]).unwrap().0 }
})
},
Err(err) => {
println!("KafkaConsumer error: {:?}", err);
None
},
}
}
}
@@ -0,0 +1,149 @@
extern crate clap;
extern crate rdkafka;
extern crate timely;
extern crate abomonation;

use clap::{App, Arg};

use rdkafka::Message;
use rdkafka::config::{ClientConfig, TopicConfig};
use rdkafka::producer::BaseProducer;
use rdkafka::consumer::BaseConsumer;

mod kafka_event;

fn test_timely() {

use std::rc::Rc;
use std::net::{TcpListener, TcpStream};
use std::sync::{Arc, Mutex};
use timely::dataflow::Scope;
use timely::dataflow::operators::{Capture, ToStream, Inspect};
use timely::dataflow::operators::capture::{EventReader, EventWriter, Replay, Extract};

// get send and recv endpoints, wrap send to share
let (send0, recv0) = ::std::sync::mpsc::channel();
let send0 = Arc::new(Mutex::new(send0));

timely::execute(timely::Configuration::Thread, move |worker| {

// this is only to validate the output.
let send0 = send0.lock().unwrap().clone();

// these allow us to capture / replay a timely stream.
let list = TcpListener::bind("127.0.0.1:8000").unwrap();
let send = TcpStream::connect("127.0.0.1:8000").unwrap();
let recv = list.incoming().next().unwrap().unwrap();

worker.dataflow::<u64,_,_>(|scope1|
(0..10u64)
.to_stream(scope1)
.capture_into(EventWriter::new(send))
);

worker.dataflow::<u64,_,_>(|scope2| {
Some(EventReader::<_,u64,_>::new(recv))
.replay_into(scope2)
.capture_into(send0)
});
}).unwrap();

assert_eq!(recv0.extract()[0].1, (0..10).collect::<Vec<_>>());

}

fn round_trip(brokers: &str, topic_name: &str) -> Result<(), rdkafka::error::KafkaError> {

let mut topic_config = TopicConfig::new();
topic_config
.set("produce.offset.report", "true")
.finalize();

let mut producer_config = ClientConfig::new();
producer_config
.set("bootstrap.servers", brokers)
.set_default_topic_config(topic_config.clone());

let mut consumer_config = ClientConfig::new();
consumer_config
.set("group.id", "example")
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("bootstrap.servers", brokers)
.set_default_topic_config(topic_config);

let producer: BaseProducer<_> = try!(producer_config.create());
let consumer: BaseConsumer<_> = try!(consumer_config.create());

try!(consumer.subscribe(&[topic_name]));

// give each a chance to sync up?
try!(consumer.poll(1000));
producer.poll(1000);

let text = format!("{:?}", 0);
try!(producer.send_copy::<str,()>(topic_name, None, Some(text.as_str()), None, None, None));
println!("{:?}:\tsend {:?}", ::std::time::Instant::now(), text);

let mut some_recv: u64 = 0;
let mut none_recv: u64 = 0;

while some_recv < 10 {

producer.poll(0);
match try!(consumer.poll(0)) {
// this *never* seems to trigger.
Some(result) => {
some_recv += 1;
println!("{:?}:\trecv {:?}", ::std::time::Instant::now(), result.payload_view::<str>());
if some_recv < 10 {
let text = format!("{}{:?}", result.payload_view::<str>().unwrap().unwrap(), some_recv);
try!(producer.send_copy::<str,()>(topic_name, None, Some(text.as_str()), None, None, None));
println!("{:?}:\tsend {:?}", ::std::time::Instant::now(), text);
}
},
// this happens lots.
None => {
none_recv += 1;
if none_recv & (none_recv - 1) == 0 {
// print for power-of-two `none_recv`.
println!("received .. None ({:?} times)", none_recv);
}
}
}
}

Ok(())
}

fn main() {
let matches = App::new("producer example")
.version(option_env!("CARGO_PKG_VERSION").unwrap_or(""))
.about("Simple command line producer")
.arg(Arg::with_name("brokers")
.short("b")
.long("brokers")
.help("Broker list in kafka format")
.takes_value(true)
.default_value("localhost:9092"))
.arg(Arg::with_name("log-conf")
.long("log-conf")
.help("Configure the logging format (example: 'rdkafka=trace')")
.takes_value(true))
.arg(Arg::with_name("topic")
.short("t")
.long("topic")
.help("Destination topic")
.takes_value(true)
.required(true))
.get_matches();

let topic = matches.value_of("topic").unwrap();
let brokers = matches.value_of("brokers").unwrap();

match round_trip(brokers, topic) {
Ok(_) => println!("{:?}:\texit: success!", ::std::time::Instant::now()),
Err(x) => println!("{:?}:\texit: error: {:?} =/", ::std::time::Instant::now(), x),
};
}

0 comments on commit e4d59ae

Please sign in to comment.
You can’t perform that action at this time.