Skip to content

Commit

Permalink
fix: buildUniqueConsumerGroupId should not return just unknownhost wh…
Browse files Browse the repository at this point in the history
…en an error happens
  • Loading branch information
3rs4lg4d0 committed Oct 22, 2023
1 parent c8023f7 commit 721881b
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 3 deletions.
13 changes: 10 additions & 3 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ func (e *checkTimeoutError) Error() string {
return e.message
}

// hostnameProvider is a generic contract to get the hostname. We use the hostname
// to build the consumer group identifier.
type hostnameProvider func() (string, error)

// hnProvider is the default hostname provider.
var hnProvider hostnameProvider = os.Hostname

// KafkaConfig is used for configuring the go-kafka check.
type KafkaConfig struct {
BootstrapServers string // coma separated list of kafka brokers
Expand Down Expand Up @@ -120,13 +127,13 @@ func validateKafkaConfig(cfg *KafkaConfig) error {
// same health topic that will be unused on every restart of our application so better to
// have a periodic process to cleanup consumer groups in the kafka cluster.
func buildUniqueConsumerGroupId() string {
hostname, err := os.Hostname()
hostname, err := hnProvider()
if err != nil {
return "unknownhost"
hostname = "unknownhost"
}

timestamp := time.Now().UnixNano()
uniqueID := fmt.Sprintf("%s-%s-%d", consumerGroupPrefix, hostname, timestamp)
uniqueID := fmt.Sprintf("%s@@%s@@%d", consumerGroupPrefix, hostname, timestamp)

return uniqueID
}
Expand Down
41 changes: 41 additions & 0 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package kafka

import (
"context"
"errors"
"fmt"
"os"
"reflect"
"strings"
"testing"
"time"

Expand Down Expand Up @@ -42,6 +44,45 @@ func TestMain(m *testing.M) {
os.Exit(code)
}

func TestBuildUniqueConsumerGroupId(t *testing.T) {
realHostname, _ := os.Hostname()

type args struct {
hnProvider hostnameProvider
}
testcases := []struct {
name string
args args
wantHostname string
}{
{
name: "return the hostname",
args: args{
hnProvider: os.Hostname,
},
wantHostname: realHostname,
},
{
name: "return unknownhost is an error happens",
args: args{
hnProvider: func() (string, error) {
return "", errors.New("unexpected error")
},
},
wantHostname: "unknownhost",
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
hnProvider = tc.args.hnProvider
cg := buildUniqueConsumerGroupId()
substrings := strings.Split(cg, "@@")
assert.Equal(t, tc.wantHostname, substrings[1])
})
}
}

func TestNewKafka(t *testing.T) {
ctx := context.Background()
bootStrapServers, _ := kafkaContainer.Brokers(ctx)
Expand Down

0 comments on commit 721881b

Please sign in to comment.