Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9
Clone or download
Latest commit 8cd6c69 Oct 8, 2018
Permalink
Failed to load latest commit information.
cmd/sarama-cluster-cli Updated example Mar 14, 2017
testdata Support Kafka 1.0 Dec 19, 2017
.gitignore Use glide for dependency management Jun 25, 2016
.travis.yml Test against Kafka v1.1.0 Jun 25, 2018
Gopkg.lock Test against Kafka v1.1.0 Jun 25, 2018
Gopkg.toml Support Kafka 1.0 Dec 19, 2017
LICENSE Commit every time, even if not dirty. Mar 13, 2017
Makefile Test against Kafka v1.1.0 Jun 25, 2018
README.md Updated README Oct 2, 2017
README.md.tpl Expose individual partitions Sep 28, 2017
balancer.go Expose offset methods on partition consumers Mar 7, 2018
balancer_test.go Adding notifications on rebalance start Sep 26, 2017
client.go Allow to re-use clients Nov 8, 2017
client_test.go Allow to re-use clients Nov 8, 2017
cluster.go Force rebalance when partition consumers exit with an error Sep 26, 2017
cluster_test.go Revert "Fix tests" Aug 16, 2018
config.go Update config.go to correct typos (#266) Sep 17, 2018
config_test.go Remove sleep and replace with select statement. Switch from dwell tim… Jun 10, 2017
consumer.go Add missing parentheses (#273) Oct 8, 2018
consumer_test.go Fix NPE on ResetOffsets and MarkOffsets Feb 6, 2018
doc.go Added support for Kafka 0.9 consumer protocol (v2) Dec 31, 2015
examples_test.go Bump CI config Mar 7, 2018
offsets.go Address comments + gofmt Jan 3, 2018
offsets_test.go Address comments and add tests Dec 27, 2017
partitions.go made error handling in PartitionModeConsumer conform to docs (#267) Sep 18, 2018
partitions_test.go Fix/remove test Mar 7, 2018
util.go Force rebalance when partition consumers exit with an error Sep 26, 2017

README.md

Sarama Cluster

GoDoc Build Status Go Report Card License

Cluster extensions for Sarama, the Go client library for Apache Kafka 0.9 (and later).

Documentation

Documentation and example are available via godoc at http://godoc.org/github.com/bsm/sarama-cluster

Examples

Consumers have two modes of operation. In the default multiplexed mode messages (and errors) of multiple topics and partitions are all passed to the single channel:

package main

import (
	"fmt"
	"log"
	"os"
	"os/signal"

	cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, enable errors and notifications
	config := cluster.NewConfig()
	config.Consumer.Return.Errors = true
	config.Group.Return.Notifications = true

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume errors
	go func() {
		for err := range consumer.Errors() {
			log.Printf("Error: %s\n", err.Error())
		}
	}()

	// consume notifications
	go func() {
		for ntf := range consumer.Notifications() {
			log.Printf("Rebalanced: %+v\n", ntf)
		}
	}()

	// consume messages, watch signals
	for {
		select {
		case msg, ok := <-consumer.Messages():
			if ok {
				fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
				consumer.MarkOffset(msg, "")	// mark message as processed
			}
		case <-signals:
			return
		}
	}
}

Users who require access to individual partitions can use the partitioned mode which exposes access to partition-level consumers:

package main

import (
  "fmt"
  "log"
  "os"
  "os/signal"

  cluster "github.com/bsm/sarama-cluster"
)

func main() {

	// init (custom) config, set mode to ConsumerModePartitions
	config := cluster.NewConfig()
	config.Group.Mode = cluster.ConsumerModePartitions

	// init consumer
	brokers := []string{"127.0.0.1:9092"}
	topics := []string{"my_topic", "other_topic"}
	consumer, err := cluster.NewConsumer(brokers, "my-consumer-group", topics, config)
	if err != nil {
		panic(err)
	}
	defer consumer.Close()

	// trap SIGINT to trigger a shutdown.
	signals := make(chan os.Signal, 1)
	signal.Notify(signals, os.Interrupt)

	// consume partitions
	for {
		select {
		case part, ok := <-consumer.Partitions():
			if !ok {
				return
			}

			// start a separate goroutine to consume messages
			go func(pc cluster.PartitionConsumer) {
				for msg := range pc.Messages() {
					fmt.Fprintf(os.Stdout, "%s/%d/%d\t%s\t%s\n", msg.Topic, msg.Partition, msg.Offset, msg.Key, msg.Value)
					consumer.MarkOffset(msg, "")	// mark message as processed
				}
			}(part)
		case <-signals:
			return
		}
	}
}

Running tests

You need to install Ginkgo & Gomega to run tests. Please see http://onsi.github.io/ginkgo for more details.

To run tests, call:

$ make test

Troubleshooting

Consumer not receiving any messages?

By default, sarama's Config.Consumer.Offsets.Initial is set to sarama.OffsetNewest. This means that in the event that a brand new consumer is created, and it has never committed any offsets to kafka, it will only receive messages starting from the message after the current one that was written.

If you wish to receive all messages (from the start of all messages in the topic) in the event that a consumer does not have any offsets committed to kafka, you need to set Config.Consumer.Offsets.Initial to sarama.OffsetOldest.