Skip to content

Commit

Permalink
Revamp connectivity check
Browse files Browse the repository at this point in the history
  • Loading branch information
atanasdinov committed Feb 22, 2023
1 parent dee5d8c commit 2de4d54
Show file tree
Hide file tree
Showing 5 changed files with 158 additions and 34 deletions.
32 changes: 32 additions & 0 deletions connectivity_check.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kafka

import (
"fmt"
"time"

"github.com/Shopify/sarama"
)

var connectivityTimeout = 3 * time.Second

var ErrConnectivityTimedOut = fmt.Errorf("kafka connectivity timed out")

func checkConnectivity(brokers []string) error {
errCh := make(chan error)

go func() {
client, err := sarama.NewClient(brokers, nil)
if err == nil {
_ = client.Close()
}

errCh <- err
}()

select {
case <-time.After(connectivityTimeout):
return ErrConnectivityTimedOut
case err := <-errCh:
return err
}
}
30 changes: 10 additions & 20 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kafka
import (
"context"
"fmt"
"math/rand"
"strings"
"time"

Expand Down Expand Up @@ -46,7 +45,13 @@ type ConsumerConfig struct {
}

func NewConsumer(config ConsumerConfig, topics []*Topic, log *logger.UPPLogger) (*Consumer, error) {
consumerGroup, err := newConsumerGroup(config)
if config.Options == nil {
config.Options = DefaultConsumerOptions()
}

brokers := strings.Split(config.BrokersConnectionString, ",")

consumerGroup, err := sarama.NewConsumerGroup(brokers, config.ConsumerGroup, config.Options)
if err != nil {
return nil, fmt.Errorf("creating consumer group: %w", err)
}
Expand Down Expand Up @@ -152,22 +157,16 @@ func (c *Consumer) Close() error {

// ConnectivityCheck checks whether a connection to Kafka can be established.
func (c *Consumer) ConnectivityCheck() error {
config := ConsumerConfig{
BrokersConnectionString: c.config.BrokersConnectionString,
ConsumerGroup: fmt.Sprintf("healthcheck-%d", rand.Intn(100)),
Options: c.config.Options,
}
consumerGroup, err := newConsumerGroup(config)
if err != nil {
brokers := strings.Split(c.config.BrokersConnectionString, ",")

if err := checkConnectivity(brokers); err != nil {
if c.config.ClusterArn != nil {
return verifyHealthErrorSeverity(err, c.clusterDescriber, c.config.ClusterArn)
}

return err
}

_ = consumerGroup.Close()

return nil
}

Expand All @@ -185,15 +184,6 @@ func (c *Consumer) MonitorCheck() error {
return nil
}

func newConsumerGroup(config ConsumerConfig) (sarama.ConsumerGroup, error) {
if config.Options == nil {
config.Options = DefaultConsumerOptions()
}

brokers := strings.Split(config.BrokersConnectionString, ",")
return sarama.NewConsumerGroup(brokers, config.ConsumerGroup, config.Options)
}

func newConsumerGroupOffsetFetchers(brokerConnectionString string) (consumerOffsetFetcher, topicOffsetFetcher, error) {
brokers := strings.Split(brokerConnectionString, ",")
client, err := sarama.NewClient(brokers, sarama.NewConfig())
Expand Down
52 changes: 52 additions & 0 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,58 @@ func TestConsumer_Connectivity(t *testing.T) {
},
expectedErr: "kafka: client has run out of available brokers to talk to",
},
{
name: "connectivity times out when the cluster is active",
requiresConnection: false,
newConsumer: func() *Consumer {
brokerID := int32(1)
broker := sarama.NewMockBroker(t, brokerID)
broker.SetLatency(connectivityTimeout + time.Second)

return &Consumer{
config: ConsumerConfig{
ClusterArn: new(string),
BrokersConnectionString: broker.Addr(),
},
clusterDescriber: &clusterDescriberMock{
describeCluster: func(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) {
return &kafka.DescribeClusterV2Output{
ClusterInfo: &types.Cluster{
State: types.ClusterStateActive,
},
}, nil
},
},
}
},
expectedErr: "kafka connectivity timed out",
},
{
name: "connectivity timeout is ignored during cluster maintenance",
requiresConnection: false,
newConsumer: func() *Consumer {
brokerID := int32(1)
broker := sarama.NewMockBroker(t, brokerID)
broker.SetLatency(connectivityTimeout + time.Second)

return &Consumer{
config: ConsumerConfig{
ClusterArn: new(string),
BrokersConnectionString: broker.Addr(),
},
clusterDescriber: &clusterDescriberMock{
describeCluster: func(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) {
return &kafka.DescribeClusterV2Output{
ClusterInfo: &types.Cluster{
State: types.ClusterStateMaintenance,
},
}, nil
},
},
}
},
expectedErr: "",
},
}

for _, test := range tests {
Expand Down
24 changes: 10 additions & 14 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,13 @@ type ProducerConfig struct {
}

func NewProducer(config ProducerConfig) (*Producer, error) {
producer, err := newProducer(config)
if config.Options == nil {
config.Options = DefaultProducerOptions()
}

brokers := strings.Split(config.BrokersConnectionString, ",")

producer, err := sarama.NewSyncProducer(brokers, config.Options)
if err != nil {
return nil, fmt.Errorf("creating producer: %w", err)
}
Expand Down Expand Up @@ -58,29 +64,19 @@ func (p *Producer) Close() error {

// ConnectivityCheck checks whether a connection to Kafka can be established.
func (p *Producer) ConnectivityCheck() error {
producer, err := newProducer(p.config)
if err != nil {
brokers := strings.Split(p.config.BrokersConnectionString, ",")

if err := checkConnectivity(brokers); err != nil {
if p.config.ClusterArn != nil {
return verifyHealthErrorSeverity(err, p.clusterDescriber, p.config.ClusterArn)
}

return err
}

_ = producer.Close()

return nil
}

func newProducer(config ProducerConfig) (sarama.SyncProducer, error) {
if config.Options == nil {
config.Options = DefaultProducerOptions()
}

brokers := strings.Split(config.BrokersConnectionString, ",")
return sarama.NewSyncProducer(brokers, config.Options)
}

// DefaultProducerOptions creates a new Sarama producer configuration with default values.
func DefaultProducerOptions() *sarama.Config {
config := sarama.NewConfig()
Expand Down
54 changes: 54 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/Shopify/sarama/mocks"
"github.com/aws/aws-sdk-go-v2/service/kafka"
"github.com/aws/aws-sdk-go-v2/service/kafka/types"
Expand Down Expand Up @@ -136,6 +138,58 @@ func TestProducer_Connectivity(t *testing.T) {
},
expectedErr: "kafka: client has run out of available brokers to talk to",
},
{
name: "connectivity times out when the cluster is active",
requiresConnection: false,
newProducer: func() *Producer {
brokerID := int32(1)
broker := sarama.NewMockBroker(t, brokerID)
broker.SetLatency(connectivityTimeout + time.Second)

return &Producer{
config: ProducerConfig{
ClusterArn: new(string),
BrokersConnectionString: broker.Addr(),
},
clusterDescriber: &clusterDescriberMock{
describeCluster: func(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) {
return &kafka.DescribeClusterV2Output{
ClusterInfo: &types.Cluster{
State: types.ClusterStateActive,
},
}, nil
},
},
}
},
expectedErr: "kafka connectivity timed out",
},
{
name: "connectivity timeout is ignored during cluster maintenance",
requiresConnection: false,
newProducer: func() *Producer {
brokerID := int32(1)
broker := sarama.NewMockBroker(t, brokerID)
broker.SetLatency(connectivityTimeout + time.Second)

return &Producer{
config: ProducerConfig{
ClusterArn: new(string),
BrokersConnectionString: broker.Addr(),
},
clusterDescriber: &clusterDescriberMock{
describeCluster: func(ctx context.Context, input *kafka.DescribeClusterV2Input, optFns ...func(*kafka.Options)) (*kafka.DescribeClusterV2Output, error) {
return &kafka.DescribeClusterV2Output{
ClusterInfo: &types.Cluster{
State: types.ClusterStateMaintenance,
},
}, nil
},
},
}
},
expectedErr: "",
},
}

for _, test := range tests {
Expand Down

0 comments on commit 2de4d54

Please sign in to comment.