-
Notifications
You must be signed in to change notification settings - Fork 3.6k
/
producer.go
207 lines (172 loc) · 8.28 KB
/
producer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
//
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
//
package pulsar
import (
"context"
"time"
)
type MessageRoutingMode int
const (
// Publish messages across all partitions in round-robin.
RoundRobinDistribution MessageRoutingMode = iota
// The producer will chose one single partition and publish all the messages into that partition
UseSinglePartition
// Use custom message router implementation that will be called to determine the partition for a particular message.
CustomPartition
)
type HashingScheme int
const (
JavaStringHash HashingScheme = iota // Java String.hashCode() equivalent
Murmur3_32Hash // Use Murmur3 hashing function
BoostHash // C++ based boost::hash
)
type CompressionType int
const (
NoCompression CompressionType = iota
LZ4
ZLib
ZSTD
SNAPPY
)
type TopicMetadata interface {
// Get the number of partitions for the specific topic
NumPartitions() int
}
type ProducerOptions struct {
// Specify the topic this producer will be publishing on.
// This argument is required when constructing the producer.
Topic string
// Specify a name for the producer
// If not assigned, the system will generate a globally unique name which can be access with
// Producer.ProducerName().
// When specifying a name, it is up to the user to ensure that, for a given topic, the producer name is unique
// across all Pulsar's clusters. Brokers will enforce that only a single producer a given name can be publishing on
// a topic.
Name string
// Attach a set of application defined properties to the producer
// This properties will be visible in the topic stats
Properties map[string]string
// Set the send timeout (default: 30 seconds)
// If a message is not acknowledged by the server before the sendTimeout expires, an error will be reported.
// Setting the timeout to -1, will set the timeout to infinity, which can be useful when using Pulsar's message
// deduplication feature.
SendTimeout time.Duration
// Set the max size of the queue holding the messages pending to receive an acknowledgment from the broker.
// When the queue is full, by default, all calls to Producer.send() and Producer.sendAsync() will fail
// unless `BlockIfQueueFull` is set to true. Use BlockIfQueueFull(boolean) to change the blocking behavior.
MaxPendingMessages int
// Set the number of max pending messages across all the partitions
// This setting will be used to lower the max pending messages for each partition
// `MaxPendingMessages(int)`, if the total exceeds the configured value.
MaxPendingMessagesAcrossPartitions int
// Set whether the `Producer.Send()` and `Producer.sendAsync()` operations should block when the outgoing
// message queue is full. Default is `false`. If set to `false`, send operations will immediately fail with
// `ProducerQueueIsFullError` when there is no space left in pending queue.
BlockIfQueueFull bool
// Set the message routing mode for the partitioned producer.
// Default routing mode is round-robin routing.
//
// This logic is applied when the application is not setting a key ProducerMessage#setKey(String) on a
// particular message.
MessageRoutingMode
// Change the `HashingScheme` used to chose the partition on where to publish a particular message.
// Standard hashing functions available are:
//
// - `JavaStringHash` : Java String.hashCode() equivalent
// - `Murmur3_32Hash` : Use Murmur3 hashing function.
// https://en.wikipedia.org/wiki/MurmurHash">https://en.wikipedia.org/wiki/MurmurHash
// - `BoostHash` : C++ based boost::hash
//
// Default is `JavaStringHash`.
HashingScheme
// Set the compression type for the producer.
// By default, message payloads are not compressed. Supported compression types are:
// - LZ4
// - ZLIB
// - ZSTD
// - SNAPPY
//
// Note: ZSTD is supported since Pulsar 2.3. Consumers will need to be at least at that
// release in order to be able to receive messages compressed with ZSTD.
//
// Note: SNAPPY is supported since Pulsar 2.4. Consumers will need to be at least at that
// release in order to be able to receive messages compressed with SNAPPY.
CompressionType
// Set a custom message routing policy by passing an implementation of MessageRouter
// The router is a function that given a particular message and the topic metadata, returns the
// partition index where the message should be routed to
MessageRouter func(Message, TopicMetadata) int
// Control whether automatic batching of messages is enabled for the producer. Default: false [No batching]
//
// When batching is enabled, multiple calls to Producer.sendAsync can result in a single batch to be sent to the
// broker, leading to better throughput, especially when publishing small messages. If compression is enabled,
// messages will be compressed at the batch level, leading to a much better compression ratio for similar headers or
// contents.
//
// When enabled default batch delay is set to 1 ms and default batch size is 1000 messages
Batching bool
// Set the time period within which the messages sent will be batched (default: 10ms) if batch messages are
// enabled. If set to a non zero value, messages will be queued until this time interval or until
BatchingMaxPublishDelay time.Duration
// Set the maximum number of messages permitted in a batch. (default: 1000) If set to a value greater than 1,
// messages will be queued until this threshold is reached or batch interval has elapsed
BatchingMaxMessages uint
}
// The producer is used to publish messages on a topic
type Producer interface {
// return the topic to which producer is publishing to
Topic() string
// return the producer name which could have been assigned by the system or specified by the client
Name() string
// Send a message
// This call will be blocking until is successfully acknowledged by the Pulsar broker.
// Example:
// producer.Send(ctx, pulsar.ProducerMessage{ Payload: myPayload })
// @Deprecated
Send(context.Context, ProducerMessage) error
// Send a message
// This call will be blocking until is successfully acknowledged by the Pulsar broker.
// Example:
// msgID, err := producer.SendAndGetMsgID(ctx, pulsar.ProducerMessage{ Payload: myPayload })
SendAndGetMsgID(context.Context, ProducerMessage) (MessageID, error)
// Send a message in asynchronous mode
// The callback will report back the message being published and
// the eventual error in publishing
// @Deprecated
SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))
// Send a message in asynchronous mode
// The callback will report back the message being published and
// the eventual error in publishing
SendAndGetMsgIDAsync(context.Context, ProducerMessage, func(MessageID, error))
// Get the last sequence id that was published by this producer.
// This represent either the automatically assigned or custom sequence id (set on the ProducerMessage) that
// was published and acknowledged by the broker.
// After recreating a producer with the same producer name, this will return the last message that was
// published in the previous producer session, or -1 if there no message was ever published.
// return the last sequence id published by this producer.
LastSequenceID() int64
// Flush all the messages buffered in the client and wait until all messages have been successfully
// persisted.
Flush() error
// Close the producer and releases resources allocated
// No more writes will be accepted from this producer. Waits until all pending write request are persisted. In case
// of errors, pending writes will not be retried.
Close() error
Schema() Schema
}