diff --git a/kafka.go b/kafka.go index 3bf9e27..f73e199 100644 --- a/kafka.go +++ b/kafka.go @@ -31,6 +31,13 @@ func (e *checkTimeoutError) Error() string { return e.message } +// hostnameProvider is a generic contract to get the hostname. We use the hostname +// to build the consumer group identifier. +type hostnameProvider func() (string, error) + +// hnProvider is the default hostname provider. +var hnProvider hostnameProvider = os.Hostname + // KafkaConfig is used for configuring the go-kafka check. type KafkaConfig struct { BootstrapServers string // coma separated list of kafka brokers @@ -120,13 +127,13 @@ func validateKafkaConfig(cfg *KafkaConfig) error { // same health topic that will be unused on every restart of our application so better to // have a periodic process to cleanup consumer groups in the kafka cluster. func buildUniqueConsumerGroupId() string { - hostname, err := os.Hostname() + hostname, err := hnProvider() if err != nil { - return "unknownhost" + hostname = "unknownhost" } timestamp := time.Now().UnixNano() - uniqueID := fmt.Sprintf("%s-%s-%d", consumerGroupPrefix, hostname, timestamp) + uniqueID := fmt.Sprintf("%s@@%s@@%d", consumerGroupPrefix, hostname, timestamp) return uniqueID } diff --git a/kafka_test.go b/kafka_test.go index 3a40c28..d2b2624 100644 --- a/kafka_test.go +++ b/kafka_test.go @@ -2,9 +2,11 @@ package kafka import ( "context" + "errors" "fmt" "os" "reflect" + "strings" "testing" "time" @@ -42,6 +44,45 @@ func TestMain(m *testing.M) { os.Exit(code) } +func TestBuildUniqueConsumerGroupId(t *testing.T) { + realHostname, _ := os.Hostname() + + type args struct { + hnProvider hostnameProvider + } + testcases := []struct { + name string + args args + wantHostname string + }{ + { + name: "return the hostname", + args: args{ + hnProvider: os.Hostname, + }, + wantHostname: realHostname, + }, + { + name: "return unknownhost is an error happens", + args: args{ + hnProvider: func() (string, error) { + return "", errors.New("unexpected error") + }, + }, + wantHostname: "unknownhost", + }, + } + + for _, tc := range testcases { + t.Run(tc.name, func(t *testing.T) { + hnProvider = tc.args.hnProvider + cg := buildUniqueConsumerGroupId() + substrings := strings.Split(cg, "@@") + assert.Equal(t, tc.wantHostname, substrings[1]) + }) + } +} + func TestNewKafka(t *testing.T) { ctx := context.Background() bootStrapServers, _ := kafkaContainer.Brokers(ctx)