-
Notifications
You must be signed in to change notification settings - Fork 126
/
async.go
126 lines (114 loc) · 3.89 KB
/
async.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
package pulsarutils
import (
"context"
"errors"
"fmt"
"sync"
"time"
commonmetrics "github.com/G-Research/armada/internal/common/ingest/metrics"
"github.com/apache/pulsar-client-go/pulsar"
"github.com/sirupsen/logrus"
"github.com/G-Research/armada/internal/common/logging"
)
// ConsumerMessageId wraps a pulsar message id and an identifier for the consumer which originally received the
// corresponding message. This exists because we need to track which messages came from which consumers so that
// we can ACK them on the correct consumer.
type ConsumerMessageId struct {
MessageId pulsar.MessageID
Index int64
ConsumerId int
}
// ConsumerMessage wraps a pulsar message and an identifier for the consumer which originally received the
// corresponding message. This exists because we need to track which messages came from which consumers so that
// we can ACK them on the correct consumer.
type ConsumerMessage struct {
Message pulsar.Message
ConsumerId int
}
var log = logrus.NewEntry(logrus.StandardLogger())
func Receive(
ctx context.Context,
consumer pulsar.Consumer,
receiveTimeout time.Duration,
backoffTime time.Duration,
m *commonmetrics.Metrics,
) chan pulsar.Message {
out := make(chan pulsar.Message)
go func() {
// Periodically log the number of processed messages.
logInterval := 60 * time.Second
lastLogged := time.Now()
numReceived := 0
var lastMessageId pulsar.MessageID
lastMessageId = nil
lastPublishTime := time.Now()
// Run until ctx is cancelled.
for {
// Periodic logging.
if time.Since(lastLogged) > logInterval {
log.WithFields(
logrus.Fields{
"received": numReceived,
"interval": logInterval,
"lastMessageId": lastMessageId,
"timeLag": time.Now().Sub(lastPublishTime),
},
).Info("message statistics")
numReceived = 0
lastLogged = time.Now()
}
// Exit if the context has been cancelled. Otherwise, get a message from Pulsar.
select {
case <-ctx.Done():
log.Infof("Shutting down pulsar receiver")
close(out)
return
default:
// Get a message from Pulsar, which consists of a sequence of events (i.e., state transitions).
ctxWithTimeout, cancel := context.WithTimeout(ctx, receiveTimeout)
msg, err := consumer.Receive(ctxWithTimeout)
if errors.Is(err, context.DeadlineExceeded) {
log.Debugf("No message received")
cancel()
break // expected
}
cancel()
// If receiving fails, try again in the hope that the problem is transient.
// We don't need to distinguish between errors here, since any error means this function can't proceed.
if err != nil {
m.RecordPulsarConnectionError()
logging.
WithStacktrace(log, err).
WithField("lastMessageId", lastMessageId).
Warnf("Pulsar receive failed; backing off for %s", backoffTime)
time.Sleep(backoffTime)
continue
}
numReceived++
lastPublishTime = msg.PublishTime()
lastMessageId = msg.ID()
out <- msg
}
}
}()
return out
}
// Ack will ack all pulsar messages coming in on the msgs channel. The incoming messages contain a consumer id which
// corresponds to the index of the consumer that should be used to perform the ack. In theory, the acks could be done
// in parallel, however its unlikely that they will be a performance bottleneck
func Ack(ctx context.Context, consumers []pulsar.Consumer, msgs chan []*ConsumerMessageId, wg *sync.WaitGroup) {
for msg := range msgs {
for _, id := range msg {
if id.ConsumerId < 0 || id.ConsumerId >= len(consumers) {
// This indicates a programming error and should never happen!
panic(
fmt.Sprintf(
"Asked to ack message belonging to consumer %d, however this is outside the bounds of the consumers array which is of length %d",
id.ConsumerId, len(consumers)))
}
consumers[id.ConsumerId].AckID(id.MessageId)
}
}
log.Info("Shutting down Ackker")
wg.Done()
}