Package gkafka provides producer and consumer client for kafka server.
go get -u github.com/gogf/gkafka
or use go.mod
require github.com/gogf/gkafka latest
package main
import (
"fmt"
"github.com/gogf/gkafka"
"time"
)
func newKafkaClientProducer(topic string) *gkafka.Client {
kafkaConfig := gkafka.NewConfig()
kafkaConfig.Servers = "localhost:9092"
kafkaConfig.AutoMarkOffset = false
kafkaConfig.Topics = topic
return gkafka.NewClient(kafkaConfig)
}
func main() {
client := newKafkaClientProducer("test")
defer client.Close()
for {
s := time.Now().String()
fmt.Println("produce:", s)
if err := client.SyncSend(&gkafka.Message{Value: []byte(s)}); err != nil {
fmt.Println(err)
}
time.Sleep(time.Second)
}
}
package main
import (
"fmt"
"github.com/gogf/gkafka"
)
func newKafkaClientConsumer(topic, group string) *gkafka.Client {
kafkaConfig := gkafka.NewConfig()
kafkaConfig.Servers = "localhost:9092"
kafkaConfig.AutoMarkOffset = false
kafkaConfig.Topics = topic
kafkaConfig.GroupId = group
return gkafka.NewClient(kafkaConfig)
}
func main() {
group := "test-group"
topic := "test"
client := newKafkaClientConsumer(topic, group)
defer client.Close()
for {
if msg, err := client.Receive(); err != nil {
fmt.Println(err)
break
} else {
fmt.Println("consume:", msg.Partition, msg.Offset, string(msg.Value))
msg.MarkOffset()
}
}
}
package main
import (
"fmt"
"github.com/gogf/gkafka"
)
func main() {
config := gkafka.NewConfig()
config.Servers = "localhost:9092"
client := gkafka.NewClient(config)
defer client.Close()
fmt.Println(client.Topics())
}
gkafka
is licensed under the MIT License, 100% free and open-source, forever.