go kafka
Go Makefile
Switch branches/tags
Nothing to show
Clone or download
Fetching latest commit…
Cannot retrieve the latest commit at this time.
Permalink
Failed to load latest commit information.
tools
.gitignore
.travis.yml
LICENSE
Makefile
README.md
consumer.go
converts.go
kafka.go
kafka_test.go
message.go
payload_codec.go
publisher.go
request.go
timing.go

README.md

kafka - Publisher & Consumer for Kafka in Go

Kafka is a distributed publish-subscribe messaging system: (http://kafka.apache.org)

Go language: (http://golang.org/)

Forked from: https://github.com/jdamick/kafka/.

Changes

4/13 - Merged back from the apache repository & outstanding patches from jira applied

Get up and running

Install go (version 1):
For more info see: http://weekly.golang.org/doc/install.html#install

Make sure to set your GOROOT properly (http://golang.org/doc/install.html#environment). Also set your GOPATH appropriately: http://weekly.golang.org/doc/code.html#tmp_13

Build from source:

make kafka
Make the tools (publisher & consumer)
make tools
Start zookeeper, Kafka server
For more info on Kafka, see: http://incubator.apache.org/kafka/quickstart.html

Tools

Start a consumer:


   $GOPATH/bin/consumer -topic test -consumeforever
  Consuming Messages :
  From: localhost:9092, topic: test, partition: 0
   ---------------------- 

Now the consumer will just poll until a message is received.

Publish a message:


  $GOPATH/bin/publisher -topic test -message "Hello World"

The consumer should output message.

API Usage

Publishing



broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewMessage([]byte("tesing 1 2 3")))

Publishing Compressed Messages



broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
broker.Publish(kafka.NewCompressedMessage([]byte("tesing 1 2 3")))

Consumer


broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
broker.Consume(func(msg *kafka.Message) { msg.Print() })

Or the consumer can use a channel based approach:


broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
go broker.ConsumeOnChannel(msgChan, 10, quitChan)

Consuming Offsets


broker := kafka.NewBrokerOffsetConsumer("localhost:9092", "mytesttopic", 0)
offsets, err := broker.GetOffsets(-1, 1)

Contact

jeffreydamick (at) gmail (dot) com

http://twitter.com/jeffreydamick

Big thank you to NeuStar for sponsoring this work.