Kafka is a distributed publish-subscribe messaging system: (http://kafka.apache.org)
Go language: (http://golang.org/)
Forked from: https://github.com/jdamick/kafka/.
4/13 - Merged back from the apache repository & outstanding patches from jira applied
Install go (version 1):
For more info see: http://weekly.golang.org/doc/install.html#install
Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment). Also set your GOPATH appropriately: http://weekly.golang.org/doc/code.html#tmp_13
Build from source:
make kafka
Make the tools (publisher & consumer)
make tools
Start zookeeper, Kafka server
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html
Start a consumer:
$GOPATH/bin/consumer -topic test -consumeforever
Consuming Messages :
From: localhost:9092, topic: test, partition: 0
----------------------
Now the consumer will just poll until a message is received.
Publish a message:
$GOPATH/bin/publisher -topic test -message "Hello World"
The consumer should output message.
broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewMessage([]byte("tesing 1 2 3")))
broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewCompressedMessage([]byte("tesing 1 2 3")))
broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
broker.Consume(func(msg *kafka.Message) { msg.Print() })
Or the consumer can use a channel based approach:
broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
go broker.ConsumeOnChannel(msgChan, 10, quitChan)
broker := kafka.NewBrokerOffsetConsumer("localhost:9092", "mytesttopic", 0)
offsets, err := broker.GetOffsets(-1, 1)
jeffreydamick (at) gmail (dot) com
http://twitter.com/jeffreydamick
Big thank you to NeuStar for sponsoring this work.