Skip to content

Commit

Permalink
Extend connectivity tests
Browse files Browse the repository at this point in the history
  • Loading branch information
atanasdinov committed Dec 23, 2022
1 parent dc2ed5f commit 63a3476
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 77 deletions.
163 changes: 123 additions & 40 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,42 +2,34 @@ package kafka

import (
"context"
"fmt"
"sync"
"sync/atomic"
"testing"
"time"

"github.com/Financial-Times/go-logger/v2"
"github.com/Shopify/sarama"
"github.com/aws/aws-sdk-go-v2/service/kafka"
"github.com/aws/aws-sdk-go-v2/service/kafka/types"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const testConsumerGroup = "testGroup"

var messages = []*sarama.ConsumerMessage{{Value: []byte("Message1")}, {Value: []byte("Message2")}}
var testMessages = []*sarama.ConsumerMessage{{Value: []byte("Message1")}, {Value: []byte("Message2")}}

func TestConsumerGroup_KafkaConnection(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test as it requires a connection to Kafka.")
}

consumer := newTestConsumer(t, testTopic)
assert.NoError(t, consumer.Close())
type clusterDescriberMock struct {
describeCluster func(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error)
}

func TestConsumer_InvalidConnection(t *testing.T) {
log := logger.NewUPPLogger("test", "INFO")
consumer := Consumer{
config: ConsumerConfig{
BrokersConnectionString: "unknown:9092",
ConsumerGroup: testConsumerGroup,
},
topics: []*Topic{NewTopic(testTopic)},
logger: log,
func (cd *clusterDescriberMock) DescribeClusterV2(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) {
if cd.describeCluster != nil {
return cd.describeCluster(ctx, input, optFns...)
}

assert.Error(t, consumer.ConnectivityCheck())
panic("DescribeClusterV2 not implemented")
}

func newTestConsumer(t *testing.T, topic string) *Consumer {
Expand All @@ -55,16 +47,115 @@ func newTestConsumer(t *testing.T, topic string) *Consumer {
return consumer
}

func TestKafkaConsumer_Start(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test as it requires a connection to Kafka.")
func TestConsumer_Connectivity(t *testing.T) {
tests := []struct {
name string
requiresConnection bool
newConsumer func() *Consumer
expectedErr string
}{
{
name: "valid connection",
requiresConnection: true,
newConsumer: func() *Consumer {
return newTestConsumer(t, testTopic)
},
},
{
name: "invalid connection causes kafka error",
requiresConnection: false,
newConsumer: func() *Consumer {
return &Consumer{
config: ConsumerConfig{
BrokersConnectionString: "unknown:9092",
},
}
},
expectedErr: "kafka: client has run out of available brokers to talk to",
},
{
name: "invalid connection causes cluster status error",
requiresConnection: false,
newConsumer: func() *Consumer {
return &Consumer{
config: ConsumerConfig{
ClusterArn: new(string),
BrokersConnectionString: "unknown:9092",
},
clusterDescriber: &clusterDescriberMock{
describeCluster: func(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) {
return nil, fmt.Errorf("status error")
},
},
}
},
expectedErr: "cluster status is unknown: status error",
},
{
name: "invalid connection is ignored during cluster maintenance",
requiresConnection: false,
newConsumer: func() *Consumer {
return &Consumer{
config: ConsumerConfig{
ClusterArn: new(string),
BrokersConnectionString: "unknown:9092",
},
clusterDescriber: &clusterDescriberMock{
describeCluster: func(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) {
return &kafka.DescribeClusterV2Output{
ClusterInfo: &types.Cluster{
State: types.ClusterStateMaintenance,
},
}, nil
},
},
}
},
expectedErr: "",
},
{
name: "invalid connection causes kafka error when the cluster is active",
requiresConnection: false,
newConsumer: func() *Consumer {
return &Consumer{
config: ConsumerConfig{
ClusterArn: new(string),
BrokersConnectionString: "unknown:9092",
},
clusterDescriber: &clusterDescriberMock{
describeCluster: func(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) {
return &kafka.DescribeClusterV2Output{
ClusterInfo: &types.Cluster{
State: types.ClusterStateActive,
},
}, nil
},
},
}
},
expectedErr: "kafka: client has run out of available brokers to talk to",
},
}

consumer := newTestConsumer(t, testTopic)
consumer.Start(func(msg FTMessage) {})
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
if test.requiresConnection && testing.Short() {
t.Skip("Skipping test as it requires a connection to Kafka.")
return
}

require.NoError(t, consumer.ConnectivityCheck())
assert.NoError(t, consumer.Close())
consumer := test.newConsumer()
connectivityErr := consumer.ConnectivityCheck()

if test.expectedErr != "" {
require.Error(t, connectivityErr)
assert.Contains(t, connectivityErr.Error(), test.expectedErr)
return
}

assert.NoError(t, connectivityErr)
})
}
}

type mockConsumerGroupClaim struct {
Expand Down Expand Up @@ -170,21 +261,12 @@ func (m *mockConsumerGroupSession) Context() context.Context {
return m.ctx
}

func NewMockConsumer() *Consumer {
func newMockConsumer() *Consumer {
log := logger.NewUPPLogger("test", "INFO")

return &Consumer{
config: ConsumerConfig{
ConsumerGroup: "group",
BrokersConnectionString: "node",
},
topics: []*Topic{
{
Name: testTopic,
},
},
consumerGroup: &mockConsumerGroup{
messages: messages,
messages: testMessages,
},
monitor: &consumerMonitor{
subscriptions: map[string][]int32{},
Expand All @@ -199,16 +281,17 @@ func NewMockConsumer() *Consumer {
}
}

func TestConsumer_Start(t *testing.T) {
func TestConsumer_Workflow(t *testing.T) {
var count int32
consumer := NewMockConsumer()
consumer := newMockConsumer()

consumer.Start(func(msg FTMessage) {
atomic.AddInt32(&count, 1)
})

time.Sleep(1 * time.Second)
assert.Equal(t, int32(len(messages)), atomic.LoadInt32(&count))
time.Sleep(1 * time.Second) // Let message handling take place.

assert.Equal(t, int32(len(testMessages)), atomic.LoadInt32(&count))

assert.NoError(t, consumer.Close())
}
Loading

0 comments on commit 63a3476

Please sign in to comment.