-
Notifications
You must be signed in to change notification settings - Fork 123
/
bmqp_pusheventbuilder.h
357 lines (305 loc) · 13.4 KB
/
bmqp_pusheventbuilder.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// bmqp_pusheventbuilder.h -*-C++-*-
#ifndef INCLUDED_BMQP_PUSHEVENTBUILDER
#define INCLUDED_BMQP_PUSHEVENTBUILDER
//@PURPOSE: Provide a mechanism to build a BlazingMQ 'PUSH' event.
//
//@CLASSES:
// bmqp::PushEventBuilder: mechanism to build a BlazingMQ PUSH event.
//
//@DESCRIPTION: 'bmqp::PushEventBuilder' provides a mechanism to build a
// 'PushEvent'. Such event starts by an 'EventHeader', followed by one or many
// repetitions of a pair of 'PushHeader' + message payload. A
// 'PushEventBuilder' can be reused to build multiple Events, by calling the
// 'reset()' method on it.
//
/// Padding
///-------
// Each message added to the PushEvent is padded, so that multiple messages can
// be added in the same event, without impacting the alignment of the headers.
//
/// Thread Safety
///-------------
// NOT thread safe
//
/// Usage
///-----
//..
// bdlbb::PooledBlobBufferFactory bufferFactory(1024, d_allocator_p);
// bmqp::PushEventBuilder builder(&bufferFactory, d_allocator_p);
//
// // Append multiple messages
// builder.packMessage("hello", 5, 0, bmqt::MessageGUID());
// builder.packMessage(myBlob, 0, bmqt::MessageGUID());
//
// const bdlbb::Blob& eventBlob = builder.blob();
// // Send the blob ...
//
// // We can reset the builder to reuse it; note that this invalidates the
// // 'eventBlob' retrieved above
// builder.reset();
//..
// BMQ
#include <bmqp_optionutil.h>
#include <bmqp_protocol.h>
#include <bmqp_protocolutil.h>
#include <bmqt_messageguid.h>
#include <bmqt_resultcode.h>
// MWC
#include <mwcu_blobobjectproxy.h>
// BDE
#include <bdlbb_blob.h>
#include <bslma_allocator.h>
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_assert.h>
#include <bsls_cpp11.h>
namespace BloombergLP {
namespace bmqp {
// ======================
// class PushEventBuilder
// ======================
/// Mechanism to build a BlazingMQ PUSH event
class PushEventBuilder {
private:
// DATA
bslma::Allocator* d_allocator_p;
// Allocator to use.
mutable bdlbb::Blob d_blob;
// blob being built by this
// PushEventBuilder.
// This has been done mutable to be
// able to skip writing the length
// until the blob is retrieved.
int d_msgCount;
// number of messages currently in
// the event.
OptionUtil::OptionsBox d_options;
// Provides information and operations
// associated with the options of the
// current (to-be-packed) message.
mwcu::BlobObjectProxy<PushHeader> d_currPushHeader;
// Push Header associated with the
// current (to-be-packed) message.
private:
// PRIVATE MANIPULATORS
/// Add a message to the event being built, having the specified
/// `queueId` and `msgId` and `payload` and the specified `flags` and
/// the specified `compressionAlgorithmType`. Use the specified
/// `propertiesLogic` to encode MessageProperties related flag and
/// Schema Id.
/// Return 0 on success, or a meaningful non-zero error code otherwise.
/// In case of failure, this method has no effect on the underlying
/// event blob.
bmqt::EventBuilderResult::Enum packMessageImp(
const bdlbb::Blob& payload,
int queueId,
const bmqt::MessageGUID& msgId,
int flags,
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
const MessagePropertiesInfo& messagePropertiesInfo);
/// Check if `PushHeader` has been written for current event. If it
/// hasn't, add the `PushHeader`. The behavior is undefined unless
/// there's enough space reserved on the underlying event blob.
void ensurePushHeader();
/// Erase from the event being built the PushHeader and options that
/// have been added to the current (to-be-packed) message, if these were
/// written to the underlying blob. Return 0 on success, and non-zero
/// on error.
int eraseCurrentMessage();
private:
// NOT IMPLEMENTED
PushEventBuilder(const PushEventBuilder&) BSLS_CPP11_DELETED;
/// Copy constructor and assignment operator not implemented
PushEventBuilder& operator=(const PushEventBuilder&) BSLS_CPP11_DELETED;
public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(PushEventBuilder, bslma::UsesBslmaAllocator)
// CREATORS
/// Create a new `PushEventBuilder` using the specified `bufferFactory`
/// and `allocator` for the blob.
PushEventBuilder(bdlbb::BlobBufferFactory* bufferFactory,
bslma::Allocator* allocator);
// MANIPULATORS
/// Reset this builder to an initial state so that it can be used to
/// build a new `PushEvent`. Note that calling reset invalidates the
/// content of the blob returned by the `blob()` method. Return 0 on
/// success, or non-zero on error.
int reset();
/// Add a message to the event being built, having the specified
/// `queueId`, `msgId`, `payload`, `flags` and
/// `compressionAlgorithmType`. Use the specified `propertiesLogic` to
/// encode MessageProperties related flag and Schema Id.
/// Return 0 on success, or a meaningfulnon-zero error code otherwise.
/// In case of failure, this method has no effect on the underlying
/// event blob.
bmqt::EventBuilderResult::Enum
packMessage(const bdlbb::Blob& payload,
int queueId,
const bmqt::MessageGUID& msgId,
int flags,
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
const MessagePropertiesInfo& messagePropertiesInfo =
bmqp::MessagePropertiesInfo());
/// Add a message to the event being built, having the specified
/// `queueId`, `msgId`, `flags` and `compressionAlgorithmType`. Note
/// that since no payload is specified, in addition to `flags`, the
/// `implicit payload` flag will also be set. Use the specified
/// `propertiesLogic` to encode MessageProperties related flag and
/// Schema Id.
/// Return 0 on success, or a meaningful non-zero error code otherwise.
/// In case of failure, this method has no effect on the underlying
/// event blob.
bmqt::EventBuilderResult::Enum
packMessage(int queueId,
const bmqt::MessageGUID& msgId,
int flags,
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
const MessagePropertiesInfo& messagePropertiesInfo =
bmqp::MessagePropertiesInfo());
/// Add a message to the event being built, having the specified
/// `payload` and `header`. Return 0 on success, or a meaningful
/// non-zero error code otherwise. In case of failure, this method has
/// no effect on the underlying event blob.
bmqt::EventBuilderResult::Enum packMessage(const bdlbb::Blob& payload,
const PushHeader& header);
/// Add a SubQueueId option having the specified `subQueueIds` to the
/// current (to-be-packed) message in the event being built. Return 0
/// on success, or a meaningful non-zero error code otherwise. In case
/// of failure, this method has no effect on the underlying event blob.
bmqt::EventBuilderResult::Enum
addSubQueueIdsOption(const Protocol::SubQueueIdsArrayOld& subQueueIds);
/// Add a SubQueueInfo option having the specified `subQueueInfos` to
/// the current (to-be-packed) message in the event being built. If the
/// specified `packRdaCounter` is true, also pack the remaining delivery
/// attempts counter (`rdaCounter`). If `packRdaCounter` is true and
/// `subQueueInfos` contains only the default SubQueueId, pack a special
/// option header carrying information about the RDA counter. Return 0
/// on success, or a meaningful non-zero error code otherwise. In case
/// of failure, this method has no effect on the underlying event blob.
bmqt::EventBuilderResult::Enum
addSubQueueInfosOption(const Protocol::SubQueueInfosArray& subQueueInfos,
bool packRdaCounter = true);
/// Add a Group Id option having the specified `msgGroupId` to the
/// current (to-be-packed) message in the event being built. Return 0
/// on success, or a meaningful non-zero error code otherwise. In case
/// of failure, this method has no effect on the underlying event blob.
bmqt::EventBuilderResult::Enum
addMsgGroupIdOption(const Protocol::MsgGroupId& msgGroupId);
// ACCESSORS
/// Return the current size of the event being built. Note that this
/// size excludes the PushHeader and options of a message that has not
/// been packed with a call to one of the `packMessage` methods.
int eventSize() const;
/// Return the number of messages currently in the event being built.
int messageCount() const;
/// Return a reference not offering modifiable access to the blob built
/// by this event. If no messages were added, this will return a blob
/// composed only of an `EventHeader`.
const bdlbb::Blob& blob() const;
};
// ============================================================================
// INLINE DEFINITIONS
// ============================================================================
// ----------------------
// class PushEventBuilder
// ----------------------
// PRIVATE MANIPULATORS
inline int PushEventBuilder::eraseCurrentMessage()
{
// PRECONDITIONS
const int optionsSize = d_options.size();
#ifdef BSLS_ASSERT_SAFE_IS_ACTIVE
bool hasNoOptions = optionsSize == 0 && !d_currPushHeader.isSet();
bool hasOptions = optionsSize > 0 && d_currPushHeader.isSet();
bool isValidBlobSize = d_blob.length() >
(static_cast<int>(sizeof(PushHeader)) +
optionsSize);
BSLS_ASSERT_SAFE(hasNoOptions || (hasOptions && isValidBlobSize));
#endif
if (optionsSize > 0) {
// We previously wrote a PushHeader and options to the current message.
d_currPushHeader.reset();
// Flush any buffered changes if necessary, and make this object
// not refer to any valid blob object.
d_blob.setLength(d_blob.length() - sizeof(PushHeader) - optionsSize);
d_options.reset();
}
return 0;
}
// MANIPULATORS
inline bmqt::EventBuilderResult::Enum
PushEventBuilder::packMessage(const bdlbb::Blob& payload,
const PushHeader& header)
{
return packMessageImp(payload,
header.queueId(),
header.messageGUID(),
header.flags(),
header.compressionAlgorithmType(),
MessagePropertiesInfo(header));
}
inline bmqt::EventBuilderResult::Enum PushEventBuilder::packMessage(
const bdlbb::Blob& payload,
int queueId,
const bmqt::MessageGUID& msgId,
int flags,
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
const MessagePropertiesInfo& messagePropertiesInfo)
{
return packMessageImp(payload,
queueId,
msgId,
flags,
compressionAlgorithmType,
messagePropertiesInfo);
}
inline bmqt::EventBuilderResult::Enum PushEventBuilder::packMessage(
int queueId,
const bmqt::MessageGUID& msgId,
int flags,
bmqt::CompressionAlgorithmType::Enum compressionAlgorithmType,
const MessagePropertiesInfo& messagePropertiesInfo)
{
const bdlbb::Blob emptyBlob;
PushHeaderFlagUtil::setFlag(&flags, PushHeaderFlags::e_IMPLICIT_PAYLOAD);
return packMessageImp(emptyBlob,
queueId,
msgId,
flags,
compressionAlgorithmType,
messagePropertiesInfo);
}
// ACCESSORS
inline int PushEventBuilder::eventSize() const
{
// PRECONDITIONS
const int optionsCount = d_options.optionsCount();
BSLS_ASSERT_SAFE((optionsCount > 0 && d_currPushHeader.isSet()) ||
(optionsCount == 0 && !d_currPushHeader.isSet()));
if (optionsCount > 0) {
return d_blob.length() - sizeof(PushHeader) - d_options.size();
// RETURN
}
return d_blob.length();
}
inline int PushEventBuilder::messageCount() const
{
return d_msgCount;
}
} // close package namespace
} // close enterprise namespace
#endif