Skip to content

Commit

Permalink
Consume Kafka messages since a given duration (#227)
Browse files Browse the repository at this point in the history
  • Loading branch information
teivah committed Jun 30, 2020
1 parent 2001975 commit 11f9bb2
Show file tree
Hide file tree
Showing 9 changed files with 803 additions and 11 deletions.
15 changes: 11 additions & 4 deletions component/async/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"strconv"
"time"

"github.com/Shopify/sarama"
"github.com/beatlabs/patron/component/async"
Expand All @@ -32,6 +33,9 @@ var topicPartitionOffsetDiff *prometheus.GaugeVec
var messageStatus *prometheus.CounterVec
var messageConfirmation *prometheus.CounterVec

// TimeExtractor defines a function extracting a time from a Kafka message.
type TimeExtractor func(*sarama.ConsumerMessage) (time.Time, error)

// TopicPartitionOffsetDiffGaugeSet creates a new Gauge that measures partition offsets.
func TopicPartitionOffsetDiffGaugeSet(group, topic string, partition int32, high, offset int64) {
topicPartitionOffsetDiff.WithLabelValues(group, topic, strconv.FormatInt(int64(partition), 10)).Set(float64(high - offset))
Expand Down Expand Up @@ -84,10 +88,13 @@ func init() {

// ConsumerConfig is the common configuration of patron kafka consumers.
type ConsumerConfig struct {
Brokers []string
Buffer int
DecoderFunc encoding.DecodeRawFunc
SaramaConfig *sarama.Config
Brokers []string
Buffer int
DecoderFunc encoding.DecodeRawFunc
DurationBasedConsumer bool
DurationOffset time.Duration
TimeExtractor func(*sarama.ConsumerMessage) (time.Time, error)
SaramaConfig *sarama.Config
}

type message struct {
Expand Down
17 changes: 17 additions & 0 deletions component/async/kafka/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,20 @@ func DecoderJSON() OptionFunc {
return nil
}
}

// WithDurationOffset allows creating a consumer from a given duration.
// It accepts a function indicating how to extract the time from a Kafka message.
func WithDurationOffset(since time.Duration, timeExtractor TimeExtractor) OptionFunc {
return func(c *ConsumerConfig) error {
if since < 0 {
return errors.New("duration must be positive")
}
if timeExtractor == nil {
return errors.New("empty time extractor function")
}
c.DurationBasedConsumer = true
c.DurationOffset = since
c.TimeExtractor = timeExtractor
return nil
}
}
53 changes: 53 additions & 0 deletions component/async/kafka/option_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package kafka

import (
"errors"
"reflect"
"runtime"
"testing"
"time"

Expand Down Expand Up @@ -150,3 +152,54 @@ func TestDecoderJSON(t *testing.T) {
reflect.ValueOf(c.DecoderFunc).Pointer(),
)
}

func TestWithDurationOffset(t *testing.T) {
f := func(_ *sarama.ConsumerMessage) (time.Time, error) {
return time.Time{}, nil
}

type args struct {
since time.Duration
timeExtractor TimeExtractor
}
testCases := map[string]struct {
args args
expectedErr error
}{
"success": {
args: args{
since: time.Second,
timeExtractor: f,
},
},
"error - negative since duration": {
args: args{
since: -time.Second,
timeExtractor: f,
},
expectedErr: errors.New("duration must be positive"),
},
"error - nil time extractor": {
args: args{
since: time.Second,
},
expectedErr: errors.New("empty time extractor function"),
},
}
for name, tt := range testCases {
t.Run(name, func(t *testing.T) {
c := ConsumerConfig{}
err := WithDurationOffset(tt.args.since, tt.args.timeExtractor)(&c)
if tt.expectedErr != nil {
assert.EqualError(t, err, tt.expectedErr.Error())
} else {
assert.NoError(t, err)
assert.True(t, c.DurationBasedConsumer)
assert.Equal(t, time.Second, c.DurationOffset)
assert.Equal(t,
runtime.FuncForPC(reflect.ValueOf(tt.args.timeExtractor).Pointer()).Name(),
runtime.FuncForPC(reflect.ValueOf(c.TimeExtractor).Pointer()).Name())
}
})
}
}
130 changes: 130 additions & 0 deletions component/async/kafka/simple/duration_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package simple

import (
"context"
"errors"
"fmt"
"time"

"github.com/beatlabs/patron/component/async/kafka"
"github.com/beatlabs/patron/log"
)

type durationClient struct {
client durationKafkaClientAPI
}

func newDurationClient(client durationKafkaClientAPI) (durationClient, error) {
if client == nil {
return durationClient{}, errors.New("empty client api")
}
return durationClient{client: client}, nil
}

func (d durationClient) getTimeBasedOffsetsPerPartition(ctx context.Context, topic string, since time.Time, timeExtractor kafka.TimeExtractor) (map[int32]int64, error) {
partitionIDs, err := d.client.getPartitionIDs(topic)
if err != nil {
return nil, err
}

responseCh := make(chan partitionOffsetResponse, len(partitionIDs))
d.triggerWorkers(ctx, topic, since, timeExtractor, partitionIDs, responseCh)
return d.aggregateResponses(ctx, partitionIDs, responseCh)
}

type partitionOffsetResponse struct {
partitionID int32
offset int64
err error
}

func (d durationClient) triggerWorkers(ctx context.Context, topic string, since time.Time, timeExtractor kafka.TimeExtractor, partitionIDs []int32, responseCh chan<- partitionOffsetResponse) {
for _, partitionID := range partitionIDs {
partitionID := partitionID
go func() {
offset, err := d.getTimeBasedOffset(ctx, topic, since, partitionID, timeExtractor)
select {
case <-ctx.Done():
return
case responseCh <- partitionOffsetResponse{
partitionID: partitionID,
offset: offset,
err: err,
}:
}
}()
}
}

func (d durationClient) aggregateResponses(ctx context.Context, partitionIDs []int32, responseCh <-chan partitionOffsetResponse) (map[int32]int64, error) {
numberOfPartitions := len(partitionIDs)
offsets := make(map[int32]int64, numberOfPartitions)
numberOfResponses := 0
for {
select {
case <-ctx.Done():
return nil, fmt.Errorf("context cancelled before collecting partition responses: %w", ctx.Err())
case response := <-responseCh:
if response.err != nil {
return nil, response.err
}

offsets[response.partitionID] = response.offset
numberOfResponses++
if numberOfResponses == numberOfPartitions {
return offsets, nil
}
}
}
}

func (d durationClient) getTimeBasedOffset(ctx context.Context, topic string, since time.Time, partitionID int32, timeExtractor kafka.TimeExtractor) (int64, error) {
left, err := d.client.getOldestOffset(topic, partitionID)
if err != nil {
return 0, err
}

newestOffset, err := d.client.getNewestOffset(topic, partitionID)
if err != nil {
return 0, err
}
// The right boundary must be inclusive
right := newestOffset - 1

return d.offsetBinarySearch(ctx, topic, since, partitionID, timeExtractor, left, right)
}

func (d durationClient) offsetBinarySearch(ctx context.Context, topic string, since time.Time, partitionID int32, timeExtractor kafka.TimeExtractor, left, right int64) (int64, error) {
for left <= right {
mid := left + (right-left)/2

msg, err := d.client.getMessageAtOffset(ctx, topic, partitionID, mid)
if err != nil {
// Under extraordinary circumstances (e.g. the retention policy being applied just before retrieving the message at a particular offset),
// the offset might not be accessible anymore.
// In this case, we simply log a warning and restrict the interval to the right.
if errors.Is(err, &outOfRangeOffsetError{}) {
log.Warnf("offset %d on partition %d is out of range: %v", mid, partitionID, err)
left = mid + 1
continue
}
return 0, fmt.Errorf("error while retrieving message offset %d on partition %d: %w", mid, partitionID, err)
}

t, err := timeExtractor(msg)
if err != nil {
return 0, fmt.Errorf("error while executing comparator: %w", err)
}

if t.Equal(since) {
return mid, nil
}
if t.Before(since) {
left = mid + 1
} else {
right = mid - 1
}
}

return left, nil
}
Loading

0 comments on commit 11f9bb2

Please sign in to comment.