/
consumer.go
189 lines (176 loc) · 6.5 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
// Copyright 2024 The Cockroach Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0
package kafka
import (
"context"
"fmt"
"strconv"
"time"
"github.com/IBM/sarama"
"github.com/cockroachdb/cdc-sink/internal/script"
"github.com/cockroachdb/cdc-sink/internal/types"
"github.com/cockroachdb/cdc-sink/internal/util/hlc"
"github.com/cockroachdb/cdc-sink/internal/util/ident"
log "github.com/sirupsen/logrus"
)
var _ sarama.ConsumerGroupHandler = &Handler{}
type partitionState struct {
topic string
partition int32
offset int64
}
// Handler represents a Sarama consumer group consumer
type Handler struct {
// The destination for writes.
acceptor types.MultiAcceptor
batchSize int
target ident.Schema
watchers types.Watchers
timeRange hlc.Range
fromState []*partitionState
}
// Setup is run at the beginning of a new session, before ConsumeClaim
func (c *Handler) Setup(session sarama.ConsumerGroupSession) error {
// If the startup option provide a minTimestamp we mark the offset to the provided
// timestamp or the latest read message, whichever is later, for each topic and partition.
// In case we restart the process, we are able to resume from the latest committed message
// without changing the start up command.
// TODO (silvano): Should we have a --force option to restart from the provided minTimestamp?
// Using a different group id would have the same effect.
for _, marker := range c.fromState {
log.Debugf("setup: marking offset %s@%d to %d", marker.topic, marker.partition, marker.offset)
session.MarkOffset(marker.topic, marker.partition, marker.offset, "start")
}
return nil
}
// Cleanup is run at the end of a session, once all ConsumeClaim goroutines have exited
func (c *Handler) Cleanup(session sarama.ConsumerGroupSession) error {
if session.Context().Err() != nil {
log.WithError(session.Context().Err()).Error("Session terminated with an error")
}
return session.Context().Err()
}
// ConsumeClaim processes new messages for the topic/partition specified in the claim.
func (c *Handler) ConsumeClaim(
session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim,
) (err error) {
log.Debugf("ConsumeClaim topic=%s partition=%d offset=%d", claim.Topic(), claim.Partition(), claim.InitialOffset())
// Aggregate the mutations by target table.
toProcess := &types.MultiBatch{}
// Track last message received for each topic/partition.
consumed := make(map[string]*sarama.ConsumerMessage)
ctx := session.Context()
// Do not move the code below to a goroutine.
// The `ConsumeClaim` itself is called within a goroutine, see:
// https://github.com/IBM/sarama/blob/main/consumer_group.go#L27-L29
for {
select {
case message, ok := <-claim.Messages():
if !ok {
log.Debugf("message channel for topic=%s partition=%d was closed", claim.Topic(), claim.Partition())
return nil
}
partition := strconv.Itoa(int(claim.Partition()))
mutationsReceivedCount.WithLabelValues(claim.Topic(), partition).Inc()
if err = c.accumulate(toProcess, message); err != nil {
mutationsErrorCount.WithLabelValues(claim.Topic(), partition).Inc()
return err
}
mutationsSuccessCount.WithLabelValues(claim.Topic(), partition).Inc()
consumed[fmt.Sprintf("%s@%d", message.Topic, message.Partition)] = message
// Flush a batch, and mark the latest message for each topic/partition as read.
if toProcess.Count() > c.batchSize {
if err = c.accept(ctx, toProcess); err != nil {
return err
}
toProcess = toProcess.Empty()
c.mark(session, consumed)
}
// Should return when `session.Context()` is done.
// If not, will raise `ErrRebalanceInProgress` or `read tcp <ip>:<port>: i/o timeout` when kafka rebalance. see:
// https://github.com/IBM/sarama/issues/1192
case <-ctx.Done():
return nil
case <-time.After(time.Second):
// Periodically flush a batch, and mark the latest message for each topic/partition as consumed.
if err = c.accept(ctx, toProcess); err != nil {
return err
}
toProcess = toProcess.Empty()
c.mark(session, consumed)
}
}
}
// mark advances the offset on each topic/partition and removes it from the map that
// track the latest message received on the topic/partition.
func (c *Handler) mark(
session sarama.ConsumerGroupSession, consumed map[string]*sarama.ConsumerMessage,
) {
for key, message := range consumed {
session.MarkMessage(message, "")
delete(consumed, key)
}
}
// accept process a batch.
func (c *Handler) accept(ctx context.Context, toProcess *types.MultiBatch) error {
if toProcess.Count() == 0 {
// Nothing to do.
return nil
}
log.Tracef("flushing %d", toProcess.Count())
if err := c.acceptor.AcceptMultiBatch(ctx, toProcess, &types.AcceptOptions{}); err != nil {
return err
}
return nil
}
// accumulate adds the message to the batch, after converting it to a types.Mutation.
// Resolved messages are skipped.
func (c *Handler) accumulate(toProcess *types.MultiBatch, msg *sarama.ConsumerMessage) error {
payload, err := asPayload(msg)
if err != nil {
return err
}
if payload.Resolved != "" {
log.Tracef("Resolved %s %d [%s@%d]", payload.Resolved, msg.Timestamp.Unix(), msg.Topic, msg.Partition)
return nil
}
log.Tracef("Mutation %s %d [%s@%d]", string(msg.Key), msg.Timestamp.Unix(), msg.Topic, msg.Partition)
timestamp, err := hlc.Parse(payload.Updated)
if err != nil {
return err
}
table, qual, err := ident.ParseTableRelative(msg.Topic, c.target.Schema())
if err != nil {
return err
}
// Ensure the destination table is in the target schema.
if qual != ident.TableOnly {
table = ident.NewTable(c.target.Schema(), table.Table())
}
if !c.timeRange.Contains(timestamp) {
log.Debugf("skipping mutation %s@%d %s %s", string(msg.Key), msg.Offset, timestamp, c.timeRange)
return nil
}
mut := types.Mutation{
Before: payload.Before,
Data: payload.After,
Key: msg.Key,
Time: timestamp,
}
script.AddMeta("kafka", table, &mut)
log.Debugf("adding mutation %s", string(msg.Key))
return toProcess.Accumulate(table, mut)
}