Skip to content

TerrexTech/go-kafkautils

Repository files navigation

KafkaUtils for Sarama library

This is just a simple Go library providing convenience-wrappers over Shopify's Sarama library.

Usage:

  • Install dep dependencies:
dep ensure
  • Import/Use in your code!
import github.com/TerrexTech/go-kafkautils/kafka

Minimal examples:

Consumer Group (referred to as just Consumer in library):

// Handler for Consumer messages
type msgHandler struct {}

func (mh msgHandler) Setup(s sarama.ConsumerGroupSession) error {
	return nil
}

func (mh msgHandler) Cleanup(s sarama.ConsumerGroupSession) error {
	return nil
}

func (mh msgHandler) ConsumeClaim(
	session sarama.ConsumerGroupSession,
	claim sarama.ConsumerGroupClaim,
) error {
	for msg := range claim.Messages() {
		v := string(msg.Value)
		log.Println(v)
  }
  return nil
}

func main() {
  config := &kafka.ConsumerConfig{
    ConsumerGroup: "test",
    KafkaBrokers:  []string{"localhost:9092"},
    Topics:        []string{"test"},
  }
  consumer, err := kafka.NewConsumer(config)
  if err != nil {
    panic(err)
  }

  // Read Errors
  go func() {
    for err := proxyConsumer.Errors() {
      log.Println(err)
    }
  }()

  // Read Messages
  for msg := proxyConsumer.Messages() {
    log.Println(msg)
    // ...More Operations
  }

  // Don't forget to close when done
  consumer.Close()
}

Producer:

config := &producer.Config{
  KafkaBrokers: []string{"localhost:9092"},
}
producer, err := kafka.NewProducer(config)
if err != nil {
  panic(err)
}

go func() {
  for err := asyncProducer.Errors() {
    log.Println(err)
  }
}()

// Create message
strTime := strconv.Itoa(int(time.Now().Unix()))
msg := producer.CreateKeyMessage("testTopic", strTime, []byte("testValue"))

// Produce message
asyncProducer.Input() <- msg