Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: increase code coverage #9

Merged
merged 4 commits into from
Oct 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,13 @@ func (e *checkTimeoutError) Error() string {
return e.message
}

// hostnameProvider is a generic contract to get the hostname. We use the hostname
// to build the consumer group identifier.
type hostnameProvider func() (string, error)

// hnProvider is the default hostname provider.
var hnProvider hostnameProvider = os.Hostname

// KafkaConfig is used for configuring the go-kafka check.
type KafkaConfig struct {
BootstrapServers string // coma separated list of kafka brokers
Expand Down Expand Up @@ -120,13 +127,13 @@ func validateKafkaConfig(cfg *KafkaConfig) error {
// same health topic that will be unused on every restart of our application so better to
// have a periodic process to cleanup consumer groups in the kafka cluster.
func buildUniqueConsumerGroupId() string {
hostname, err := os.Hostname()
hostname, err := hnProvider()
if err != nil {
return "unknownhost"
hostname = "unknownhost"
}

timestamp := time.Now().UnixNano()
uniqueID := fmt.Sprintf("%s-%s-%d", consumerGroupPrefix, hostname, timestamp)
uniqueID := fmt.Sprintf("%s@@%s@@%d", consumerGroupPrefix, hostname, timestamp)

return uniqueID
}
Expand Down
70 changes: 68 additions & 2 deletions kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,15 @@ package kafka

import (
"context"
"errors"
"fmt"
"os"
"reflect"
"strings"
"testing"
"time"

confluentKafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/stretchr/testify/assert"
"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/modules/kafka"
Expand Down Expand Up @@ -42,6 +45,45 @@ func TestMain(m *testing.M) {
os.Exit(code)
}

func TestBuildUniqueConsumerGroupId(t *testing.T) {
realHostname, _ := os.Hostname()

type args struct {
hnProvider hostnameProvider
}
testcases := []struct {
name string
args args
wantHostname string
}{
{
name: "return the hostname",
args: args{
hnProvider: os.Hostname,
},
wantHostname: realHostname,
},
{
name: "return unknownhost is an error happens",
args: args{
hnProvider: func() (string, error) {
return "", errors.New("unexpected error")
},
},
wantHostname: "unknownhost",
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
hnProvider = tc.args.hnProvider
cg := buildUniqueConsumerGroupId()
substrings := strings.Split(cg, "@@")
assert.Equal(t, tc.wantHostname, substrings[1])
})
}
}

func TestNewKafka(t *testing.T) {
ctx := context.Background()
bootStrapServers, _ := kafkaContainer.Brokers(ctx)
Expand Down Expand Up @@ -86,7 +128,7 @@ func TestNewKafka(t *testing.T) {
}

func TestStatus(t *testing.T) {
t.Run("Run until the system is stable", func(t *testing.T) {
t.Run("run until the system is stable", func(t *testing.T) {
ctx := context.Background()
bootStrapServers, _ := kafkaContainer.Brokers(ctx)
kafkaCheck, _ := NewKafka(KafkaConfig{
Expand All @@ -113,7 +155,7 @@ func TestStatus(t *testing.T) {
}
})

t.Run("Check that at least 1 check timeout is skipped", func(t *testing.T) {
t.Run("check that at least 1 check timeout is skipped", func(t *testing.T) {
ctx := context.Background()
iterations := 2
skipped := 0
Expand Down Expand Up @@ -144,4 +186,28 @@ func TestStatus(t *testing.T) {
}
assert.True(t, skipped > 0)
})

t.Run("set an invalid topic to force a producer error", func(t *testing.T) {
ctx := context.Background()
bootStrapServers, _ := kafkaContainer.Brokers(ctx)
c, _ := confluentKafka.NewConsumer(&confluentKafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
"group.id": buildUniqueConsumerGroupId(),
"auto.offset.reset": "latest",
})
p, _ := confluentKafka.NewProducer(&confluentKafka.ConfigMap{
"bootstrap.servers": bootStrapServers,
})
kafkaCheck := &Kafka{
config: &KafkaConfig{
BootstrapServers: bootStrapServers[0],
Topic: "",
},
consumer: c,
producer: p,
}

_, err := kafkaCheck.Status()
assert.Equal(t, "error sending messages", err.Error())
})
}