Skip to content

Commit

Permalink
chore: increase code coverage (#9)
Browse files Browse the repository at this point in the history
* fix: buildUniqueConsumerGroupId should not return just unknownhost when an error happens

* chore: add a test to provoke a producer failure

* chore: update TestStatus to provide a producer error

* chore: add the error test to the end
  • Loading branch information
3rs4lg4d0 committed Oct 22, 2023
1 parent c8023f7 commit c9bdfc3
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 5 deletions.
13 changes: 10 additions & 3 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ func (e *checkTimeoutError) Error() string {
return e.message
}

// hostnameProvider is a generic contract to get the hostname. We use the hostname
// to build the consumer group identifier.
type hostnameProvider func() (string, error)

// hnProvider is the default hostname provider.
var hnProvider hostnameProvider = os.Hostname

// KafkaConfig is used for configuring the go-kafka check.
type KafkaConfig struct {
BootstrapServers string // coma separated list of kafka brokers
Expand Down Expand Up @@ -120,13 +127,13 @@ func validateKafkaConfig(cfg *KafkaConfig) error {
// same health topic that will be unused on every restart of our application so better to
// have a periodic process to cleanup consumer groups in the kafka cluster.
func buildUniqueConsumerGroupId() string {
hostname, err := os.Hostname()
hostname, err := hnProvider()
if err != nil {
return "unknownhost"
hostname = "unknownhost"
}

timestamp := time.Now().UnixNano()
uniqueID := fmt.Sprintf("%s-%s-%d", consumerGroupPrefix, hostname, timestamp)
uniqueID := fmt.Sprintf("%s@@%s@@%d", consumerGroupPrefix, hostname, timestamp)

return uniqueID
}
Expand Down
70 changes: 68 additions & 2 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package kafka

import (
"context"
"errors"
"fmt"
"os"
"reflect"
"strings"
"testing"
"time"

confluentKafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/kafka"
Expand Down Expand Up @@ -42,6 +45,45 @@ func TestMain(m *testing.M) {
os.Exit(code)
}

func TestBuildUniqueConsumerGroupId(t *testing.T) {
realHostname, _ := os.Hostname()

type args struct {
hnProvider hostnameProvider
}
testcases := []struct {
name string
args args
wantHostname string
}{
{
name: "return the hostname",
args: args{
hnProvider: os.Hostname,
},
wantHostname: realHostname,
},
{
name: "return unknownhost is an error happens",
args: args{
hnProvider: func() (string, error) {
return "", errors.New("unexpected error")
},
},
wantHostname: "unknownhost",
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
hnProvider = tc.args.hnProvider
cg := buildUniqueConsumerGroupId()
substrings := strings.Split(cg, "@@")
assert.Equal(t, tc.wantHostname, substrings[1])
})
}
}

func TestNewKafka(t *testing.T) {
ctx := context.Background()
bootStrapServers, _ := kafkaContainer.Brokers(ctx)
Expand Down Expand Up @@ -86,7 +128,7 @@ func TestNewKafka(t *testing.T) {
}

func TestStatus(t *testing.T) {
t.Run("Run until the system is stable", func(t *testing.T) {
t.Run("run until the system is stable", func(t *testing.T) {
ctx := context.Background()
bootStrapServers, _ := kafkaContainer.Brokers(ctx)
kafkaCheck, _ := NewKafka(KafkaConfig{
Expand All @@ -113,7 +155,7 @@ func TestStatus(t *testing.T) {
}
})

t.Run("Check that at least 1 check timeout is skipped", func(t *testing.T) {
t.Run("check that at least 1 check timeout is skipped", func(t *testing.T) {
ctx := context.Background()
iterations := 2
skipped := 0
Expand Down Expand Up @@ -144,4 +186,28 @@ func TestStatus(t *testing.T) {
}
assert.True(t, skipped > 0)
})

t.Run("set an invalid topic to force a producer error", func(t *testing.T) {
ctx := context.Background()
bootStrapServers, _ := kafkaContainer.Brokers(ctx)
c, _ := confluentKafka.NewConsumer(&confluentKafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
"group.id": buildUniqueConsumerGroupId(),
"auto.offset.reset": "latest",
})
p, _ := confluentKafka.NewProducer(&confluentKafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
})
kafkaCheck := &Kafka{
config: &KafkaConfig{
BootstrapServers: bootStrapServers[0],
Topic: "",
},
consumer: c,
producer: p,
}

_, err := kafkaCheck.Status()
assert.Equal(t, "error sending messages", err.Error())
})
}

0 comments on commit c9bdfc3

Please sign in to comment.