forked from harlow/kinesis-consumer
/
broker.go
114 lines (97 loc) · 2.66 KB
/
broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
package consumer
import (
"context"
"fmt"
"sync"
"time"
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/service/kinesis"
"github.com/aws/aws-sdk-go/service/kinesis/kinesisiface"
)
func newBroker(
client kinesisiface.KinesisAPI,
streamName string,
shardc chan *kinesis.Shard,
logger Logger,
) *broker {
return &broker{
client: client,
shards: make(map[string]*kinesis.Shard),
streamName: streamName,
shardc: shardc,
logger: logger,
}
}
// broker caches a local list of the shards we are already processing
// and routinely polls the stream looking for new shards to process
type broker struct {
client kinesisiface.KinesisAPI
streamName string
shardc chan *kinesis.Shard
logger Logger
shardMu sync.Mutex
shards map[string]*kinesis.Shard
}
// start is a blocking operation which will loop and attempt to find new
// shards on a regular cadence.
func (b *broker) start(ctx context.Context) {
b.findNewShards()
ticker := time.NewTicker(30 * time.Second)
// Note: while ticker is a rather naive approach to this problem,
// it actually simplies a few things. i.e. If we miss a new shard while
// AWS is resharding we'll pick it up max 30 seconds later.
// It might be worth refactoring this flow to allow the consumer to
// to notify the broker when a shard is closed. However, shards don't
// necessarily close at the same time, so we could potentially get a
// thundering heard of notifications from the consumer.
for {
select {
case <-ctx.Done():
ticker.Stop()
return
case <-ticker.C:
b.findNewShards()
}
}
}
// findNewShards pulls the list of shards from the Kinesis API
// and uses a local cache to determine if we are already processing
// a particular shard.
func (b *broker) findNewShards() {
b.shardMu.Lock()
defer b.shardMu.Unlock()
b.logger.Log("[BROKER]", "fetching shards")
shards, err := b.listShards()
if err != nil {
b.logger.Log("[BROKER]", err)
return
}
for _, shard := range shards {
if _, ok := b.shards[*shard.ShardId]; ok {
continue
}
b.shards[*shard.ShardId] = shard
b.shardc <- shard
}
}
// listShards pulls a list of shard IDs from the kinesis api
func (b *broker) listShards() ([]*kinesis.Shard, error) {
var ss []*kinesis.Shard
var listShardsInput = &kinesis.ListShardsInput{
StreamName: aws.String(b.streamName),
}
for {
resp, err := b.client.ListShards(listShardsInput)
if err != nil {
return nil, fmt.Errorf("ListShards error: %v", err)
}
ss = append(ss, resp.Shards...)
if resp.NextToken == nil {
return ss, nil
}
listShardsInput = &kinesis.ListShardsInput{
NextToken: resp.NextToken,
StreamName: aws.String(b.streamName),
}
}
}