Skip to content

Commit

Permalink
Simplify Consumer & Producer types
Browse files Browse the repository at this point in the history
  • Loading branch information
atanasdinov committed Oct 27, 2021
1 parent 032c7d6 commit def047d
Show file tree
Hide file tree
Showing 9 changed files with 324 additions and 387 deletions.
2 changes: 1 addition & 1 deletion .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ jobs:
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: localhost:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Expand Down
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ Creating a producer:
retryInterval := time.Second // Duration between each retry for establishing connection

logger := logger.NewUPPLogger(...)
producer := kafka.NewPerseverantProducer(config, logger, initialDelay, retryInterval)

producer := kafka.NewProducer(config, logger, initialDelay, retryInterval)
```

The connection to Kafka is started in a separate go routine when creating the producer.
Expand Down Expand Up @@ -73,7 +74,9 @@ Creating a consumer:

retryInterval := time.Second // Duration between each retry for establishing connection

consumer := kafka.NewPerseverantConsumer(config, logger, retryInterval)
logger := logger.NewUPPLogger(...)

consumer := kafka.NewConsumer(config, logger, retryInterval)
```

Consuming messages:
Expand Down
127 changes: 92 additions & 35 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"math/rand"
"strings"
"sync"
"time"

"github.com/Financial-Times/go-logger/v2"
Expand All @@ -13,44 +14,84 @@ import (

const errConsumerNotConnected = "consumer is not connected to Kafka"

// messageConsumer represents the library's main kafka consumer.
type messageConsumer struct {
config *ConsumerConfig
consumerGroup sarama.ConsumerGroup
logger *logger.UPPLogger
closed chan struct{}
// Consumer which will keep trying to reconnect to Kafka on a specified interval.
// The underlying consumer group is created lazily when message listening is started.
type Consumer struct {
config ConsumerConfig
consumerGroupLock *sync.RWMutex
consumerGroup sarama.ConsumerGroup
retryInterval time.Duration
logger *logger.UPPLogger
closed chan struct{}
}

// newConsumer creates a new consumer instance using a Sarama ConsumerGroup to connect to Kafka.
func newConsumer(config ConsumerConfig, log *logger.UPPLogger) (*messageConsumer, error) {
log.Debug("Creating new consumer group")
type ConsumerConfig struct {
BrokersConnectionString string
ConsumerGroup string
Topics []string
Options *sarama.Config
}

if config.Options == nil {
config.Options = DefaultConsumerOptions()
func NewConsumer(config ConsumerConfig, log *logger.UPPLogger, retryInterval time.Duration) *Consumer {
consumer := &Consumer{
config: config,
consumerGroupLock: &sync.RWMutex{},
retryInterval: retryInterval,
logger: log,
closed: make(chan struct{}),
}
return consumer
}

brokers := strings.Split(config.BrokersConnectionString, ",")
consumerGroup, err := sarama.NewConsumerGroup(brokers, config.ConsumerGroup, config.Options)
if err != nil {
return nil, err
// connect will attempt to create a new consumer continuously until successful.
func (c *Consumer) connect() {
connectorLog := c.logger.
WithField("brokers", c.config.BrokersConnectionString).
WithField("topics", c.config.Topics).
WithField("consumer_group", c.config.ConsumerGroup)

for {
consumerGroup, err := newConsumerGroup(c.config)
if err == nil {
connectorLog.Info("Connected to Kafka consumer group")
c.setConsumerGroup(consumerGroup)
break
}

connectorLog.WithError(err).Warn("Error creating Kafka consumer group")
time.Sleep(c.retryInterval)
}
}

return &messageConsumer{
config: &config,
consumerGroup: consumerGroup,
logger: log,
closed: make(chan struct{}),
}, nil
// setConsumerGroup sets the Consumer's consumer group.
func (c *Consumer) setConsumerGroup(consumerGroup sarama.ConsumerGroup) {
c.consumerGroupLock.Lock()
defer c.consumerGroupLock.Unlock()

c.consumerGroup = consumerGroup
}

// startListening will start listening for message from Kafka.
func (c *messageConsumer) startListening(messageHandler func(message FTMessage)) {
// isConnected returns whether the consumer group is set.
// It is only set if a successful connection is established.
func (c *Consumer) isConnected() bool {
c.consumerGroupLock.RLock()
defer c.consumerGroupLock.RUnlock()

return c.consumerGroup != nil
}

// StartListening is a blocking call that tries to establish a connection to Kafka and then starts listening.
func (c *Consumer) StartListening(messageHandler func(message FTMessage)) {
if !c.isConnected() {
c.connect()
}

handler := newConsumerHandler(c.logger, messageHandler)

go func() {
for err := range c.consumerGroup.Errors() {
c.logger.WithError(err).
WithField("method", "startListening").
WithField("method", "StartListening").
Error("Error processing message")
}
}()
Expand All @@ -61,7 +102,7 @@ func (c *messageConsumer) startListening(messageHandler func(message FTMessage))
for {
if err := c.consumerGroup.Consume(ctx, c.config.Topics, handler); err != nil {
c.logger.WithError(err).
WithField("method", "startListening").
WithField("method", "StartListening").
Error("Error starting consumer")
}
// check if context was cancelled, signaling that the consumer should stop
Expand All @@ -87,32 +128,48 @@ func (c *messageConsumer) startListening(messageHandler func(message FTMessage))
c.logger.Info("Starting consumer...")
}

// close closes the consumer's connection to Kafka
// It should be called before terminating the process.
func (c *messageConsumer) close() error {
close(c.closed)
// Close closes the consumer connection if it exists.
func (c *Consumer) Close() error {
if c.isConnected() {
close(c.closed)

return c.consumerGroup.Close()
}

return c.consumerGroup.Close()
return nil
}

// connectivityCheck tries to establish a new Kafka connection with a separate consumer group
// The consumer's existing connection is automatically repaired after any interruption.
func (c *messageConsumer) connectivityCheck() error {
// ConnectivityCheck checks whether a connection to Kafka can be established.
func (c *Consumer) ConnectivityCheck() error {
if !c.isConnected() {
return fmt.Errorf(errConsumerNotConnected)
}

config := ConsumerConfig{
BrokersConnectionString: c.config.BrokersConnectionString,
ConsumerGroup: fmt.Sprintf("healthcheck-%d", rand.Intn(100)),
Topics: c.config.Topics,
Options: c.config.Options,
}
healthCheckConsumer, err := newConsumer(config, c.logger)
consumerGroup, err := newConsumerGroup(config)
if err != nil {
return err
}
_ = healthCheckConsumer.close()

_ = consumerGroup.Close()

return nil
}

func newConsumerGroup(config ConsumerConfig) (sarama.ConsumerGroup, error) {
if config.Options == nil {
config.Options = DefaultConsumerOptions()
}

brokers := strings.Split(config.BrokersConnectionString, ",")
return sarama.NewConsumerGroup(brokers, config.ConsumerGroup, config.Options)
}

// DefaultConsumerOptions returns a new sarama configuration with predefined default settings.
func DefaultConsumerOptions() *sarama.Config {
config := sarama.NewConfig()
Expand Down
91 changes: 34 additions & 57 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,80 +10,72 @@ import (
"github.com/Financial-Times/go-logger/v2"
"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

const (
brokerURL = "localhost:29092"
testConsumerGroup = "testgroup"
)
const testConsumerGroup = "testgroup"

var messages = []*sarama.ConsumerMessage{{Value: []byte("Message1")}, {Value: []byte("Message2")}}

func TestNewConsumer(t *testing.T) {
func TestConsumerGroup_KafkaConnection(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test as it requires a connection to Kafka.")
}

log := logger.NewUPPLogger("test", "INFO")
config := ConsumerConfig{
BrokersConnectionString: brokerURL,
BrokersConnectionString: testBrokers,
ConsumerGroup: testConsumerGroup,
Topics: []string{testTopic},
Options: DefaultConsumerOptions(),
}

consumer, err := newConsumer(config, log)
assert.NoError(t, err)

err = consumer.connectivityCheck()
assert.NoError(t, err)
consumerGroup, err := newConsumerGroup(config)
require.NoError(t, err)

err = consumer.close()
assert.NoError(t, err)
assert.NoError(t, consumerGroup.Close())
}

func TestConsumerNotConnectedConnectivityCheckError(t *testing.T) {
func TestConsumer_InvalidConnection(t *testing.T) {
log := logger.NewUPPLogger("test", "INFO")
consumer := messageConsumer{
config: &ConsumerConfig{
consumer := Consumer{
config: ConsumerConfig{
BrokersConnectionString: "unknown:9092",
ConsumerGroup: testConsumerGroup,
Topics: []string{testTopic},
Options: nil,
},
logger: log,
consumerGroupLock: &sync.RWMutex{},
logger: log,
}

err := consumer.connectivityCheck()
assert.Error(t, err)
assert.Error(t, consumer.ConnectivityCheck())
}

func TestNewPerseverantConsumer(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test as it requires a connection to Kafka.")
}

func NewKafkaConsumer(topic string) *Consumer {
log := logger.NewUPPLogger("test", "INFO")
config := ConsumerConfig{
BrokersConnectionString: brokerURL,
BrokersConnectionString: testBrokers,
ConsumerGroup: testConsumerGroup,
Topics: []string{testTopic},
Topics: []string{topic},
Options: DefaultConsumerOptions(),
}

consumer := NewPerseverantConsumer(config, log, time.Second)
return NewConsumer(config, log, time.Second)
}

consumer.StartListening(func(msg FTMessage) {})
func TestKafkaConsumer_StartListening(t *testing.T) {
if testing.Short() {
t.Skip("Skipping test as it requires a connection to Kafka.")
}

time.Sleep(3 * time.Second)
consumer := NewKafkaConsumer(testTopic)

err := consumer.ConnectivityCheck()
assert.NoError(t, err)
go consumer.StartListening(func(msg FTMessage) {})
time.Sleep(5 * time.Second)

time.Sleep(3 * time.Second)
require.NoError(t, consumer.ConnectivityCheck())

err = consumer.Close()
assert.NoError(t, err)
assert.NoError(t, consumer.Close())
}

type MockConsumerGroupClaim struct {
Expand Down Expand Up @@ -179,14 +171,15 @@ func (m *MockConsumerGroupSession) Context() context.Context {
return context.TODO()
}

func NewTestConsumer() *messageConsumer {
func NewMockConsumer() *Consumer {
log := logger.NewUPPLogger("test", "INFO")
return &messageConsumer{
config: &ConsumerConfig{
return &Consumer{
config: ConsumerConfig{
Topics: []string{"topic"},
ConsumerGroup: "group",
BrokersConnectionString: "node",
},
consumerGroupLock: &sync.RWMutex{},
consumerGroup: &MockConsumerGroup{
messages: messages,
},
Expand All @@ -195,24 +188,9 @@ func NewTestConsumer() *messageConsumer {
}
}

func TestMessageConsumer_StartListening(t *testing.T) {
func TestConsumer_StartListening(t *testing.T) {
var count int32
consumer := NewTestConsumer()

consumer.startListening(func(msg FTMessage) {
atomic.AddInt32(&count, 1)
})

time.Sleep(1 * time.Second)
assert.Equal(t, int32(len(messages)), atomic.LoadInt32(&count))
}

func TestPerseverantConsumerListensToConsumer(t *testing.T) {
var count int32
consumer := PerseverantConsumer{
consumerLock: &sync.RWMutex{},
consumer: NewTestConsumer(),
}
consumer := NewMockConsumer()

consumer.StartListening(func(msg FTMessage) {
atomic.AddInt32(&count, 1)
Expand All @@ -221,6 +199,5 @@ func TestPerseverantConsumerListensToConsumer(t *testing.T) {
time.Sleep(1 * time.Second)
assert.Equal(t, int32(len(messages)), atomic.LoadInt32(&count))

err := consumer.Close()
assert.NoError(t, err)
assert.NoError(t, consumer.Close())
}
Loading

0 comments on commit def047d

Please sign in to comment.