-
Notifications
You must be signed in to change notification settings - Fork 327
/
consumer.go
339 lines (268 loc) · 14 KB
/
consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package pulsar
import (
"context"
"time"
"github.com/apache/pulsar-client-go/pulsar/internal"
)
// ConsumerMessage represents a pair of a Consumer and Message.
type ConsumerMessage struct {
Consumer
Message
}
// SubscriptionType of subscription supported by Pulsar
type SubscriptionType int
const (
// Exclusive there can be only 1 consumer on the same topic with the same subscription name
Exclusive SubscriptionType = iota
// Shared subscription mode, multiple consumer will be able to use the same subscription name
// and the messages will be dispatched according to
// a round-robin rotation between the connected consumers
Shared
// Failover subscription mode, multiple consumer will be able to use the same subscription name
// but only 1 consumer will receive the messages.
// If that consumer disconnects, one of the other connected consumers will start receiving messages.
Failover
// KeyShared subscription mode, multiple consumer will be able to use the same
// subscription and all messages with the same key will be dispatched to only one consumer
KeyShared
)
type SubscriptionInitialPosition int
const (
// SubscriptionPositionLatest is the latest position which means the start consuming position
// will be the last message
SubscriptionPositionLatest SubscriptionInitialPosition = iota
// SubscriptionPositionEarliest is the earliest position which means the start consuming position
// will be the first message
SubscriptionPositionEarliest
)
// DLQPolicy represents the configuration for the Dead Letter Queue consumer policy.
type DLQPolicy struct {
// MaxDeliveries specifies the maximum number of times that a message will be delivered before being
// sent to the dead letter queue.
MaxDeliveries uint32
// DeadLetterTopic specifies the name of the topic where the failing messages will be sent.
DeadLetterTopic string
// ProducerOptions is the producer options to produce messages to the DLQ and RLQ topic
ProducerOptions ProducerOptions
// RetryLetterTopic specifies the name of the topic where the retry messages will be sent.
RetryLetterTopic string
}
// AckGroupingOptions controls how to group ACK requests
// If maxSize is 0 or 1, any ACK request will be sent immediately.
// Otherwise, the ACK requests will be cached until one of the following conditions meets:
// 1. There are `MaxSize` pending ACK requests.
// 2. `MaxTime` is greater than 1 microsecond and ACK requests have been cached for `maxTime`.
// Specially, for cumulative acknowledgment, only the latest ACK is cached and it will only be sent after `MaxTime`.
type AckGroupingOptions struct {
// The maximum number of ACK requests to cache
MaxSize uint32
// The maximum time to cache ACK requests
MaxTime time.Duration
}
// ConsumerOptions is used to configure and create instances of Consumer.
type ConsumerOptions struct {
// Topic specifies the topic this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topic string
// Topics specifies a list of topics this consumer will subscribe on.
// Either a topic, a list of topics or a topics pattern are required when subscribing
Topics []string
// TopicsPattern specifies a regular expression to subscribe to multiple topics under the same namespace.
// Either a topic, a list of topics or a topics pattern are required when subscribing
TopicsPattern string
// AutoDiscoveryPeriod specifies the interval in which to poll for new partitions or new topics
// if using a TopicsPattern.
AutoDiscoveryPeriod time.Duration
// SubscriptionName specifies the subscription name for this consumer
// This argument is required when subscribing
SubscriptionName string
// Properties represents a set of application defined properties for the consumer.
// Those properties will be visible in the topic stats
Properties map[string]string
// SubscriptionProperties specify the subscription properties for this subscription.
//
// > Notice: SubscriptionProperties are immutable, and consumers under the same subscription will fail to create a
// > subscription if they use different properties.
SubscriptionProperties map[string]string
// Type specifies the subscription type to be used when subscribing to a topic.
// Default is `Exclusive`
Type SubscriptionType
// SubscriptionInitialPosition is the initial position at which the cursor will be set when subscribe
// Default is `Latest`
SubscriptionInitialPosition
// EventListener will be called when active consumer changed (in failover subscription type)
EventListener ConsumerEventListener
// DLQ represents the configuration for Dead Letter Queue consumer policy.
// eg. route the message to topic X after N failed attempts at processing it
// By default is nil and there's no DLQ
DLQ *DLQPolicy
// KeySharedPolicy represents the configuration for Key Shared consumer policy.
KeySharedPolicy *KeySharedPolicy
// RetryEnable determines whether to automatically retry sending messages to default filled DLQPolicy topics.
// Default is false
RetryEnable bool
// MessageChannel sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ConsumerMessage
// ReceiverQueueSize sets the size of the consumer receive queue.
// The consumer receive queue controls how many messages can be accumulated by the `Consumer` before the
// application calls `Consumer.receive()`. Using a higher value could potentially increase the consumer
// throughput at the expense of bigger memory utilization.
// Default value is `1000` messages and should be good for most use cases.
ReceiverQueueSize int
// EnableAutoScaledReceiverQueueSize, if enabled, the consumer receive queue will be auto-scaled
// by the consumer actual throughput. The ReceiverQueueSize will be the maximum size which consumer
// receive queue can be scaled.
// Default is false.
EnableAutoScaledReceiverQueueSize bool
// NackRedeliveryDelay specifies the delay after which to redeliver the messages that failed to be
// processed. Default is 1 min. (See `Consumer.Nack()`)
NackRedeliveryDelay time.Duration
// Name specifies the consumer name.
Name string
// ReadCompacted, if enabled, the consumer will read messages from the compacted topic rather than reading the
// full message backlog of the topic. This means that, if the topic has been compacted, the consumer will only
// see the latest value for each key in the topic, up until the point in the topic message backlog that has been
// compacted. Beyond that point, the messages will be sent as normal.
//
// ReadCompacted can only be enabled subscriptions to persistent topics, which have a single active consumer (i.e.
// failure or exclusive subscriptions). Attempting to enable it on subscriptions to a non-persistent topics or on a
// shared subscription, will lead to the subscription call throwing a PulsarClientException.
ReadCompacted bool
// ReplicateSubscriptionState marks the subscription as replicated to keep it in sync across clusters
ReplicateSubscriptionState bool
// Interceptors is a chain of interceptors. These interceptors will be called at some points defined in
// ConsumerInterceptor interface.
Interceptors ConsumerInterceptors
// Schema represents the schema implementation.
Schema Schema
// MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
// BackoffPolicy parameterize the following options in the reconnection logic to
// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
BackoffPolicy internal.BackoffPolicy
// Decryption represents the encryption related fields required by the consumer to decrypt a message.
Decryption *MessageDecryptionInfo
// EnableDefaultNackBackoffPolicy, if enabled, the default implementation of NackBackoffPolicy will be used
// to calculate the delay time of
// nack backoff, Default: false.
EnableDefaultNackBackoffPolicy bool
// NackBackoffPolicy is a redelivery backoff mechanism which we can achieve redelivery with different
// delays according to the number of times the message is retried.
//
// > Notice: the NackBackoffPolicy will not work with `consumer.NackID(MessageID)`
// > because we are not able to get the redeliveryCount from the message ID.
NackBackoffPolicy NackBackoffPolicy
// AckWithResponse is a return value added to Ack Command, and its purpose is to confirm whether Ack Command
// is executed correctly on the Broker side. When set to true, the error information returned by the Ack
// method contains the return value of the Ack Command processed by the Broker side; when set to false, the
// error information of the Ack method only contains errors that may occur in the Go SDK's own processing.
// Default: false
AckWithResponse bool
// MaxPendingChunkedMessage sets the maximum pending chunked messages. (default: 100)
MaxPendingChunkedMessage int
// ExpireTimeOfIncompleteChunk sets the expiry time of discarding incomplete chunked message. (default: 60 seconds)
ExpireTimeOfIncompleteChunk time.Duration
// AutoAckIncompleteChunk sets whether consumer auto acknowledges incomplete chunked message when it should
// be removed (e.g.the chunked message pending queue is full). (default: false)
AutoAckIncompleteChunk bool
// Enable or disable batch index acknowledgment. To enable this feature, ensure batch index acknowledgment
// is enabled on the broker side. (default: false)
EnableBatchIndexAcknowledgment bool
// Controls how to group ACK requests, the default value is nil, which means:
// MaxSize: 1000
// MaxTime: 100*time.Millisecond
// NOTE: This option does not work if AckWithResponse is true
// because there are only synchronous APIs for acknowledgment
AckGroupingOptions *AckGroupingOptions
// SubscriptionMode specifies the subscription mode to be used when subscribing to a topic.
// Default is `Durable`
SubscriptionMode SubscriptionMode
// StartMessageIDInclusive, if true, the consumer will start at the `StartMessageID`, included.
// Default is `false` and the consumer will start from the "next" message
StartMessageIDInclusive bool
// startMessageID specifies the message id to start from. Currently, it's only used for the reader internally.
startMessageID *trackingMessageID
}
// Consumer is an interface that abstracts behavior of Pulsar's consumer
type Consumer interface {
// Subscription get a subscription for the consumer
Subscription() string
// Unsubscribe the consumer
//
// Unsubscribing will cause the subscription to be deleted,
// and all the retained data can potentially be deleted based on message retention and ttl policy.
//
// This operation will fail when performed on a shared subscription
// where more than one consumer are currently connected.
Unsubscribe() error
// Receive a single message.
// This calls blocks until a message is available.
Receive(context.Context) (Message, error)
// Chan returns a channel to consume messages from
Chan() <-chan ConsumerMessage
// Ack the consumption of a single message
Ack(Message) error
// AckID the consumption of a single message, identified by its MessageID
AckID(MessageID) error
// AckWithTxn the consumption of a single message with a transaction
AckWithTxn(Message, Transaction) error
// AckCumulative the reception of all the messages in the stream up to (and including)
// the provided message.
AckCumulative(msg Message) error
// AckIDCumulative the reception of all the messages in the stream up to (and including)
// the provided message, identified by its MessageID
AckIDCumulative(msgID MessageID) error
// ReconsumeLater mark a message for redelivery after custom delay
ReconsumeLater(msg Message, delay time.Duration)
// ReconsumeLaterWithCustomProperties mark a message for redelivery after custom delay with custom properties
ReconsumeLaterWithCustomProperties(msg Message, customProperties map[string]string, delay time.Duration)
// Nack acknowledges the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
// with ConsumerOptions.NackRedeliveryDelay .
//
// This call is not blocking.
Nack(Message)
// NackID acknowledges the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
// some fixed delay. The delay is configurable when constructing the consumer
// with ConsumerOptions.NackRedeliveryDelay .
//
// This call is not blocking.
NackID(MessageID)
// Close the consumer and stop the broker to push more messages
Close()
// Seek resets the subscription associated with this consumer to a specific message id.
// The message id can either be a specific message or represent the first or last messages in the topic.
//
// Note: this operation can only be done on non-partitioned topics. For these, one can rather perform the
// seek() on the individual partitions.
Seek(MessageID) error
// SeekByTime resets the subscription associated with this consumer to a specific message publish time.
//
// @param time
// the message publish time when to reposition the subscription
//
SeekByTime(time time.Time) error
// Name returns the name of consumer.
Name() string
}