Skip to content
golang lib for kafka
Go Makefile
Branch: master
Clone or download
Permalink
Type Name Latest commit message Commit time
Failed to load latest commit information.
test use TestMain Apr 17, 2019
tools 如果 topic 中含有*, 模糊匹配所有的 topic. Jun 11, 2019
.gitignore modified: .gitignore Jun 10, 2019
Gopkg.lock migrate to dep Jun 5, 2019
Gopkg.toml migrate to dep Jun 5, 2019
Makefile add clean command Jun 11, 2019
README.md new file: README.md Apr 12, 2019
alter_config_request.go offset+=copy() Apr 11, 2019
alter_config_response.go rename struct name Apr 11, 2019
apiversions_request.go remove correlationID from newrequest Jan 10, 2018
apiversions_response.go change correlationId to correlationID Nov 16, 2017
assign.go fix: log call error Jan 8, 2019
assign_test.go new file: assign_test.go Mar 6, 2018
broker.go fix: use tls to reopen conn Apr 17, 2019
broker_test.go NewBrokersWithConfig Apr 17, 2019
brokers.go fix: set []string to topics to get broekrs info Jun 11, 2019
compressor.go add: compressor in producer Feb 25, 2018
config.go remove CommitAfterFetch settings May 29, 2019
consumer.go use NewSimpleConsumerWithBrokers May 28, 2019
create_topics_request.go fix: add timeout setting & encode rrro May 28, 2019
create_topics_response.go add create_topics_request Apr 16, 2019
describe_configs_request.go fix: change uint16 to int16 and then int Apr 11, 2019
describe_configs_response.go fix: change uint16 to int16 and then int Apr 11, 2019
describe_groups_request.go remove correlationID from newrequest Jan 10, 2018
describe_groups_response.go getErrorFromErrorCode Dec 18, 2017
error.go add noAvaliableBrokers error Mar 19, 2019
errorcode.go add more errorcode Mar 19, 2019
fetch_request.go offset += copy() Apr 12, 2019
fetch_response.go
findcoordinator_request.go remove correlationID from newrequest Jan 10, 2018
findcoordinator_response.go modified: findcoordinator_response.go Jan 8, 2018
findcoordinator_test.go timeout Mar 28, 2018
group.go fix: #2 Dec 5, 2018
group_consumer.go call leave api when group_consumer stops Jul 2, 2019
group_test.go NewBrokersWithConfig Apr 17, 2019
gzip_compressor.go add: compressor in producer Feb 25, 2018
heartbeat_request.go offset += copy() Apr 12, 2019
heartbeat_response.go getErrorFromErrorCode Dec 18, 2017
helper.go NewBrokersWithConfig Apr 17, 2019
joingroup_request.go
joingroup_response.go getErrorFromErrorCode Dec 18, 2017
leave_group_request.go offset += copy() Apr 12, 2019
leave_group_response.go getErrorFromErrorCode Dec 18, 2017
list_groups_request.go remove correlationID from newrequest Jan 10, 2018
list_groups_response.go change correlationId to correlationID Nov 16, 2017
lz4_compressor.go use github.com/bkaradzic/go-lz4 Feb 27, 2018
message.go fix: deadloop if decomress message error Jul 10, 2019
metadata_request.go in V0, An array of topics to fetch metadata for. If no topics are spe… Jun 10, 2019
metadata_response.go update: metadata api to v1. we need controllerID Apr 16, 2019
none_compressor.go add: compressor in producer Feb 25, 2018
offset_commit_request.go offset += copy() Apr 12, 2019
offset_commit_response.go getErrorFromErrorCode Dec 18, 2017
offset_commit_test.go fix golink format Apr 16, 2019
offset_fetch_request.go offset += copy() Apr 12, 2019
offset_fetch_response.go set type of PartitonID to int32 Mar 6, 2018
offset_fetch_test.go use "childe/pflag" & "golang/glog" Apr 3, 2018
offset_request.go offset += copy() Apr 12, 2019
offset_response.go change type of Offset to int64 Mar 6, 2018
offsets_test.go rename PartitionID in metaResponse & set to int32 Mar 6, 2018
produce_request.go offset += copy() Apr 12, 2019
produce_response.go fix pointer err Mar 21, 2018
producer.go NewBrokersWithConfig Apr 17, 2019
request.go add create_topics_request Apr 16, 2019
resource_types.go rename resourcetype const Apr 11, 2019
sasl_authenticate_request.go remove log Jan 10, 2019
sasl_authenticate_response.go add plain sasl authenticate Jan 9, 2019
sasl_handshake_request.go add plain sasl authenticate Jan 9, 2019
sasl_handshake_response.go new file: sasl_handshake_response.go Jan 8, 2019
sasl_plain.go add plain sasl authenticate Jan 9, 2019
simple_consumer.go fix #10 Jun 9, 2019
simple_producer.go fix: data race May 31, 2019
snappy_compressor.go add: compressor in producer Feb 25, 2018
sync_group_request.go offset += copy() Apr 12, 2019
sync_group_response.go getErrorFromErrorCode Dec 18, 2017

README.md

Group Consumer

import  "github.com/childe/healer"

configMap := map[string]interface{}
configMap["bootstrap.servers"] = "127.0.0.1:9092,127.0.0.1:9093"
configMap["group.id"] = "mygroup"

config, err := healer.GetConsumerConfig(configMap)
if err != nil {
	glog.Errorf("could not create consumer config: %s", err)
}

c, err := healer.NewGroupConsumer("TOPICNAME", config)
if err != nil {
	glog.Errorf("could not create GroupConsumer: %s", err)
}
defer c.Close()

messages, err := c.Consume(nil)
if err != nil {
	glog.Fatalf("failed to consume: %s", err)
}

for {
	message := <-messages
	fmt.Printf("%s:%d:%d:%s\n", message.TopicName, message.PartitionID, message.Message.Offset, message.Message.Value)
}


Producer

import  "github.com/childe/healer"

configMap := map[string]interface{}
configMap["bootstrap.servers"] = "127.0.0.1:9092,127.0.0.1:9093"
config, err := healer.GetProducerConfig(configMap)

if err != nil {
	glog.Errorf("coult not create producer config: %s", err)
	return 4
}

producer := healer.NewProducer("TOPICNAME", config)
if producer == nil {
	glog.Error("could not create producer")
}
defer producer.Close()

key := []byte("")
msg := []byte("")
producer.AddMessage(key, msg)
//producer.AddMessage(nil, msg)

Console Consumer

Simple Consumer

Group Consumer (assign certain parititons, do not need join group)

You can’t perform that action at this time.