-
Notifications
You must be signed in to change notification settings - Fork 123
/
bmqa_session.h
1144 lines (1060 loc) · 50.7 KB
/
bmqa_session.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
// Copyright 2014-2023 Bloomberg Finance L.P.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
// bmqa_session.h -*-C++-*-
#ifndef INCLUDED_BMQA_SESSION
#define INCLUDED_BMQA_SESSION
//@PURPOSE: Provide access to the BlazingMQ broker.
//
//@CLASSES:
// bmqa::SessionEventHandler: interface for receiving events asynchronously.
// bmqa::Session : mechanism for access to the BlazingMQ broker.
//
//@DESCRIPTION: This component provides a mechanism, 'bmqa::Session', that
// provides access to a message queue broker and an interface,
// 'bmqa::SessionEventHandler' for asynchronous notification of events. The
// broker manages named, persistent queues of messages. This broker allows a
// client to open queues, post messages to them, or retrieve messages from
// them. All of these operations take place within the context of the session
// opened by the client application.
//
// Messages received from a broker are communicated to the application by the
// session associated with that broker in the form of events (see
// 'bmqa_event'). Events can be of two different types: (1) Messages and
// message status events ('bmqa::MessageEvent'), or (2) Session or queue
// status events ('bmqa::SessionEvent').
//
// A 'Session' can dispatch events to the application in either a synchronous
// or asynchronous mode. In synchronous mode, the application must call the
// 'nextEvent' method in order to obtain events from the 'Session'. In
// asynchronous mode, the application must supply a concrete
// 'SessionEventHandler' object at construction time. The concrete
// 'SessionEventHandler' provided by the application must implement the
// 'onSessionEvent' and 'onMessageEvent' methods, which will be called by the
// 'Session' every time a session event or a message event is received. Note
// that by default, a session created in asynchronous mode creates only one
// internal thread to dispatch events, but a different value for number of
// threads can be specified in 'bmqt::SessionOptions'.
//
// A 'Session' is created either in synchronous or in asynchronous mode, and it
// will remain in that mode until destruction. Allowing a mix between
// synchronous or asynchronous would make the SDK complicated. The only
// exceptions are the "start" and "open" operations that must be available in
// synchronous or asynchronous version for the convenience of the programmer.
//
// By default a 'Session' connects to the local broker, which in turn may
// connect to a remote cluster based on configuration.
//
// After a 'bmqa::Session' is started, the application has to open one or
// several queues in read and/or write mode.
//
/// Disclaimer
///----------
// A 'Session' object is a heavy object representing the negotiated TCP session
// with the broker, and the entire associated state (opened queues, statistics,
// ...). Therefore, sessions should be always reused if possible, preferably
// with only *one* session per lifetime of a component/library/task.
// Note that at the time of this writing multiplexing of different logical
// sessions over the same physical connection is not supported, so in certain
// circumstances reuse of the same session across the whole of a single
// application will not be possible. For example, if an application uses two
// unrelated libraries both of which use BlazingMQ under the hood, they won't
// be able to share a session as it stands.
// An example of an extreme inefficiency and an abuse of resources is to
// create a session ad-hoc every time a message needs to be posted by the same
// component.
//
/// Thread-safety
///-------------
// This session object is *thread* *enabled*, meaning that two threads can
// safely call any methods on the *same* *instance* without external
// synchronization.
//
/// Connecting to the Broker
///------------------------
// A 'Session' establishes a communication with a broker service using TCP/IP.
// Each 'Session' object must be constructed with a 'bmqa::SessionOptions'
// object, which provides the necessary information to connect to the broker.
// In particular, the 'SessionOptions' object must specify the IP address and
// port needed to connect to the broker. The 'SessionOptions' object may also
// provide extra parameters for tuning the TCP connection behavior (see
// 'bmqa_sessionoptions' for details).
//
// Note that in most cases the user does not need to explicitly construct a
// 'SessionOptions' object: the default constructor for 'SessionOptions'
// creates an instance that will connect to the broker service on the local
// machine using the standard port.
//
// Some options can also be provided using environment variables.
//: o !BMQ_BROKER_URI!: Corresponds to 'SessionOptions::brokerUri'.
//: If this environment variable is set, its value will override the one
//: specified in the 'SessionOptions'.
//
// A 'Session' object is created in an unconnected state. The 'start' or
// 'startAsync' method must be called to connect to the broker. Note that
// 'start' method is blocking, and returns either after connection to broker
// has been established (success), or after specified timeout (failure).
// 'startAsync' method, as the name suggests, connects to the broker
// asynchronously (i.e., it returns immediately), and the result of the
// operation is notified via 'bmqt::SessionEventType::CONNECTED' session event.
//
// When the 'Session' is no longer needed, the application should call the
// 'stop' (blocking) or 'stopAsync' (non-blocking) method to shut down the
// 'Session' and disconnect from the broker. Note that destroying a Session
// automatically stops it. The session can be restarted with a call to 'start'
// or 'startAsync' once it has been fully stopped.
//
/// Connection loss and reconnection
///--------------------------------
// If the connection between the application and the broker is lost, the
// 'Session' will automatically try to reconnect periodically. The 'Session'
// will also notify the application of the event of losing the connection via
// 'bmqt::SessionEventType::CONNECTION_LOST' session event.
//
// Once the connection has been re-established with the broker (as a result of
// one of the periodic reconnection attempts), the 'Session' will notify the
// application via 'bmqt::SessionEventType::RECONNECTED' session event. After
// the connection re-establishment, the 'Session' will attempt to reopen the
// queues that were in 'OPEN' state prior to connection loss. The 'Session'
// will notify the application of the result of reopen operation via
// 'bmqt::SessionEventType::QUEUE_REOPEN_RESULT' for each queue. Note that a
// reopen operation on a queue may fail (due to broker issue, machine issue,
// etc), so the application must keep track on these session events, and stop
// posting on a queue that failed to reopen.
//
// After all reopen operations are complete and application has been notified
// with all 'bmqt::SessionEventType::QUEUE_REOPEN_RESULT' events, the 'Session'
// delivers a 'bmqt::SessionEventType::STATE_RESTORED' session event to the
// application.
//
/// Example 1
///- - - - -
// The following example illustrates how to create a 'Session' in synchronous
// mode, start it, and stop it.
//..
// void runSession()
// {
// bmqt::SessionOptions options;
// options.setBrokerUri("tcp://localhost:30114");
//
// bmqa::Session session(options);
// int res = session.start();
// if (0 != res) {
// bsl::cout << "Failed to start session (" << res << ")"
// << bsl::endl;
// return;
// }
// bsl::cout << "Session started." << bsl::endl;
//
// // Open queue in READ or WRITE or READ/WRITE mode, and receive or
// // post messages, etc.
// // ...
//
// session.stop();
// }
//..
// This example can be simplified because the constructor for 'Session' uses a
// default 'SessionOptions' object that will connect to the local broker
// service. The example may be rewritten as follow:
//..
// void runSession()
// {
// bmqa::Session session; // using default 'SessionOptions'
// int res = session.start();
// if (0 != res) {
// bsl::cout << "Failed to start session (" << res << ")"
// << bsl::endl;
// return;
// }
// bsl::cout << "Session started." << bsl::endl;
//
// // Open queue in READ or WRITE or READ/WRITE mode, and receive or
// // post messages, etc.
// // ...
//
// session.stop();
// }
//..
//
/// Processing session events - synchronous mode
///--------------------------------------------
// If the 'Session' is created in synchronous mode, the application needs to
// call the 'nextEvent' method on a regular basis in order to receive events.
// This method takes an optional wait timeout as a parameter, and it will
// return the next available 'Event' from the session's internal event queue or
// it will block the calling thread execution until new 'Event' arrives or
// until the specified timeout expires. It is safe to call the 'nextEvent'
// method from different threads simultaneously: the 'Session' class provides
// proper synchronization logic to protect the internal event queue from
// corruption in this scenario.
//
/// Example 2
///- - - - -
// The following example demonstrates how to write a function that queries and
// processes events synchronously. In this example the switch form checks the
// type of the 'Event' and performs the necessary actions.
//
// We first define two functions to process 'SessionEvent' and 'MessageEvent'.
// These functions return 'true' if we should keep processing events and
// 'false' otherwise (i.e., no more events are expected from the 'Session').
//..
// bool processSessionEvent(const bmqa::SessionEvent& event)
// {
// bool result = true;
// switch (event.type()) {
//
// case bmqt::SessionEventType::e_CONNECTED:
// // The connection to the broker is established (as a result
// // of a call to the 'start' method).
// openQueues();
// startPostingToQueues();
// break;
//
// case bmqt::SessionEventType::e_DISCONNECTED:
// // The connection to the broker is terminated (as a result
// // of a call to the 'stop' method).
// result = false;
// break;
//
// case bmqt::SessionEventType::e_CONNECTION_LOST:
// // The connection to the broker dropped. Stop posting to the queue.
// stopPostingToQueues();
// break;
//
// case bmqt::SessionEventType::e_STATE_RESTORED:
// // The connection to the broker has been restored (i.e., all queues
// // have been re-opened. Resume posting to the queue.
// resumePostingToQueues();
// break;
//
// case bmqt::SessionEventType::e_CONNECTION_TIMEOUT:
// // The connection to the broker has timed out.
// result = false;
// break;
//
// case bmqt::SessionEventType::e_ERROR:
// // Internal error
// bsl::cout << "Unexpected session error: "
// << event.errorDescription() << bsl::endl;
// break;
//
// } // end switch
//
// return result;
// }
//
// bool processMessageEvent(const bmqa::MessageEvent& event)
// {
// bool result = true;
// switch (event.type()) {
//
// case bmqt::MessageEventType::e_PUSH: {
// // Received a 'PUSH' event from the broker.
// bmqa::MessageIterator msgIter = event.messageIterator();
// while (msgIter.nextMessage()) {
// const bmqa::Message& msg = msgIter.message();
// // Process 'PUSH' msg here (omitted for brevity)
// // ...
// }
// } break;
//
// case bmqt::MessageEventType::e_ACK: {
// // Received an 'ACK' event from the broker.
// bmqa::MessageIterator msgIter = event.messageIterator();
// while (msgIter.nextMessage()) {
// const bmqa::Message& msg = msgIter.message();
// // Process 'ACK' msg here (omitted for brevity)
// // ...
// }
// } break;
//
// } // end switch
//
// return result;
// }
//..
//
// Next, we define a function that handles events synchronously using the
// 'processSessionEvent' and 'processMessageEvent' functions.
//..
// void handleEventsSynchronously(bmqa::Session *startedSession)
// {
// bool more = true;
// while (more) {
// bmqa::Event event =
// startedSession->nextEvent(bsls::TimeInterval(2.0));
// if (event.isSessionEvent()) {
// more = processSessionEvent(event.sessionEvent());
// }
// else {
// more = processMessageEvent(event.messageEvent());
// }
// }
// }
//..
//
/// Processing session events - asynchronous mode
///---------------------------------------------
// If application wishes to use 'Session' in asynchronous mode, it must pass a
// managed pointer to an event handler implementing the 'SessionEventHandler'.
// In this case, when 'Session' is started, a thread pool owned by the
// 'Session' is also started for processing events asynchronously. The
// 'Session' will call event handler's 'onSessionEvent' or 'onMessageEvent'
// method every time a session event or a message event is available.
//
// Note that after the 'Session' is associated with some event handler, this
// association cannot be changed or canceled. The event handler will be used
// for processing events until the 'Session' object is destroyed.
//
/// Example 3
///- - - - -
// The following example demonstrates how to implement an event handler and how
// to make the 'Session' use an instance of this event handler for processing
// events.
//
// First, we define a concrete implementation of 'SessionEventHandler'.
//
//..
// class MyHandler: public bmqa::SessionEventHandler {
// public:
// MyHandler() { }
// virtual ~MyHandler() { }
// virtual void onSessionEvent(const bmqa::SessionEvent& event);
// virtual void onMessageEvent(const bmqa::MessageEvent& event);
// };
//
// void MyHandler::onSessionEvent(const bmqa::SessionEvent& event)
// {
// // The implementation is similar to our 'processSessionEvent' function
// // defined in the previous example.
// processSessionEvent(event);
// }
//
// void MyHandler::onMessageEvent(const bmqa::MessageEvent& event)
// {
// // The implementation is similar to our 'processMessageEvent' function
// // defined in the previous example.
// processMessageEvent(event);
// }
//..
// Next, we define a function that creates a 'Session' using our implementation
// of 'SessionEventHandler'.
//..
// void runAsyncSession()
// {
// bslma::ManagedPtr<SessionEventHandler> handlerMp(new MyHandler());
//
// bmqa::Session session(handlerMp); // using default 'SessionOptions'
// int res = session.start();
// if (0 != res) {
// bsl::cout << "Failed to start session (" << res << ")"
// << bsl::endl;
// return;
// }
//
// // ...
//
// session.stop();
// }
//..
//
/// Opening queues
///--------------
// Once the 'Session' has been created and started, the application can use it
// to open queues for producing and/or consuming messages. A queue is
// associated with a domain. Domain metadata must be deployed in the BlazingMQ
// infrastructure prior to opening queues under that domain, because opening a
// queue actually loads the metadata deployed for the associated domain.
//
// The metadata associated with a domain defines various parameters like
// maximum queue size and capacity, persistent policy, routing policy, etc.
//
// Queue are identified by URIs (Unified Resource Identifiers) that must
// follow the BlazingMQsyntax, manipulated as 'bmqt::Uri' objects. A queue URI
// is typically formatted as follows:
//..
// bmq://my.domain/my.queue
//..
// Note that domain names are unique in BlazingMQ infrastructure, which makes a
// fully qualified queue URI unique too.
//
// Queues in BlazingMQ infrastructure are created by applications on demand.
// Broke creates a queue when it receives an open-queue request from an
// application for a queue that does not exist currently.
//
// Application can open a queue by calling 'openQueue' or 'openQueueAsync'
// method on a started session. Application must pass appropriate flags to
// indicate if it wants to post messages to queue, consume messages from the
// queue, or both.
//
// Note that 'openQueue' is a blocking method, and returns after specified
// queue has been successfully opened (success) or after specified timeout has
// expired (failure). 'openQueueAsync' method, as the name suggests, is non
// blocking, and the result of the operation is notified via
// 'bmqt::SessionEventType::QUEUE_OPEN_RESULT' session event.
//
/// Example 4
///- - - - -
// The following example demonstrates how to open a queue for posting messages.
// The code first opens the queue with appropriate flags, and then uses
// 'bmqa::MessageEventBuilder' to build a message event and post to the queue.
//..
// // Session creation and startup logic elided for brevity
// const char *queueUri = "bmq://my.domain/my.queue";
// bmqa::QueueId myQueueId(1); // ID for the queue
// int rc = session.openQueue(
// &myQueueId,
// queueUri,
// bmqt::QueueFlags::e_WRITE | bmqt::QueueFlags::e_ACK,
// bsls::TimeInterval(30, 0));
//
// if (rc != 0) {
// bsl::cerr << "Failed to open queue, rc: "
// << bmqt::OpenQueueResult::Enum(rc)
// << bsl::endl;
// return;
// }
//..
// Note that apart from 'WRITE' flag, 'ACK' flag has been passed to
// 'openQueue' method above. This indicates that application is interested in
// receiving 'ACK' notification for each message it posts to the queue,
// irrespective of whether or not the message was successfully received by the
// broker and posted to the queue.
//
// Once the queue has been successfully opened for writing, messages can be
// posted to the queue for consumption by interested applications. We will use
// 'bmqa::MessageEventBuilder' to build a message event.
//..
// // Create a message event builder
// bmqa::MessageEventBuilder builder;
// session.loadMessageEventBuilder(&builder);
//
// // Create and post a message event containing 1 message
// bmqa::Message& msg = builder.startMessage();
//
// msg.setCorrelationId(myCorrelationId);
// msg.setDataRef(&myPayload); // where 'myPayload' is of type 'bdlbb::Blob'
// rc = builder.packMessage(myQueueId);
// if (rc != 0) {
// bsl::cerr << "Failed to pack message, rc: "
// << bmqt::EventBuilderResult::Enum(rc)
// << bsl::endl;
// return;
// }
//
// // Post message event
// rc = session.post(builder.messageEvent());
// if (rc != 0) {
// bsl::cerr << "Failed to post message event to the queue, rc: "
// << bmqt::PostResult::Enum(rc)
// << bsl::endl;
// return;
// }
//
// // ... post more messages
//..
//
/// Closing queues
///--------------
// After an application no longer needs to produce or consume messages from a
// queue, it can be closed by 'closeQueue' or 'closeQueueAsync' method. Note
// that closing a queue closes an application's "view" on the queue, and may
// not lead to queue deletion in the broker. A 'Session' does not expose any
// method to explicitly delete a queue.
//
// Note that 'closeQueue' is a blocking method and returns after the specified
// queue has been successfully closed (success) or after specified timeout has
// expired (failure). 'closeQueueAsync', as the name suggests, is a
// non-blocking method, and result of the operation is notified via
// 'bmqt::SessionEventType::e_QUEUE_CLOSE_RESULT' session event.
//
// There are 3 flavors which behave differently with regard to thread blocking
// and callback execution:
///----------------------------------------------------------------------------
// | openQueue | openQueueSync | openQueueAsync
// | configureQueue | configureQueueSync | configureQueueAsync
// | closeQueue | closeQueueSync | closeQueueAsync
// | (deprecated Sync) | (Synchronous) | (Asynchronous)
//-----------|-------------------|----------------------|----------------------
// event | unblocks in | unblocks in event | executes callback in
// handler | internal thread | handler thread (*) | event handler thread
// | | |
// nextEvent | unblocks in | unblocks in | executes callback
// | internal thread | internal thread | in nextEvent thread
//-----------------------------------------------------------------------------
//
// (*) - guarantees unblocking after all previously enqueued events have been
// emitted to the eventHandler, allowing the user to have proper serialization
// of events for the given queue (for example no more PUSH messages will be
// delivered through the eventHandler for the queue after
// configureQueueSync(maxUnconfirmed = 0) returns).
// BMQ
#include <bmqa_abstractsession.h>
#include <bmqa_closequeuestatus.h>
#include <bmqa_configurequeuestatus.h>
#include <bmqa_confirmeventbuilder.h>
#include <bmqa_messageeventbuilder.h>
#include <bmqa_openqueuestatus.h>
#include <bmqt_queueoptions.h>
#include <bmqt_sessionoptions.h>
#include <bmqt_uri.h>
// BDE
#include <ball_log.h>
#include <bsl_memory.h>
#include <bsl_string.h>
#include <bslma_allocator.h>
#include <bslma_managedptr.h>
#include <bslma_usesbslmaallocator.h>
#include <bslmf_nestedtraitdeclaration.h>
#include <bsls_keyword.h>
#include <bsls_timeinterval.h>
#include <bsls_types.h>
namespace BloombergLP {
// FORWARD DECLARATION
namespace bmqimp {
class Application;
}
namespace bmqimp {
class Event;
}
namespace bmqp {
class MessageGUIDGenerator;
}
namespace bslmt {
class Semaphore;
}
namespace bmqa {
// FORWARD DECLARATION
class Event;
class Message;
class MessageEvent;
class MessageProperties;
class QueueId;
class SessionEvent;
// =========================
// class SessionEventHandler
// =========================
/// Pure protocol for an asynchronous event handler. The implementation
/// must be thread safe if the `Session` is configured to use multiple
/// threads.
class SessionEventHandler {
public:
// CREATORS
/// Destroy this object.
virtual ~SessionEventHandler();
// MANIPULATORS
/// Process the specified session `event` (connected, disconnected,
/// queue opened, queue closed, etc.).
virtual void onSessionEvent(const SessionEvent& event) = 0;
/// Process the specified message `event` containing one or more
/// messages.
virtual void onMessageEvent(const MessageEvent& event) = 0;
};
// ==================
// struct SessionImpl
// ==================
/// Impl structure for the session data members, so that special task such
/// as `bmqadm` can access them by reinterpret casting a `Session` object.
/// Care should be taken though since `Session` is a polymorphic class.
struct SessionImpl {
// PUBLIC DATA
bslma::Allocator* d_allocator_p;
// The allocator to use
bmqt::SessionOptions d_sessionOptions;
// Session options as provided by
// the application.
bslma::ManagedPtr<SessionEventHandler> d_eventHandler_mp;
// Event handler, if any, to use
// for notifying application of
// events.
bsl::shared_ptr<bmqp::MessageGUIDGenerator> d_guidGenerator_sp;
// GUID generator object.
bslma::ManagedPtr<bmqimp::Application> d_application_mp;
// The application object.
private:
// NOT IMPLEMENTED
SessionImpl(const SessionImpl&) BSLS_KEYWORD_DELETED;
SessionImpl& operator=(const SessionImpl&) BSLS_KEYWORD_DELETED;
public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(SessionImpl, bslma::UsesBslmaAllocator)
// CREATORS
/// Create a new object having the specified `options` and
/// `eventHandler` and using the optionally specified `allocator`.
SessionImpl(const bmqt::SessionOptions& options,
bslma::ManagedPtr<SessionEventHandler> eventHandler,
bslma::Allocator* allocator = 0);
};
// =============
// class Session
// =============
/// A session with a BlazingMQ broker.
class Session : public AbstractSession {
public:
// TYPES
/// Invoked as a response to an asynchronous open queue operation,
/// `OpenQueueCallback` is an alias for a callback function object
/// (functor) that takes as an argument the specified `result`,
/// providing the result and context of the requested operation.
typedef AbstractSession::OpenQueueCallback OpenQueueCallback;
/// Invoked as a response to an asynchronous configure queue operation,
/// `ConfigureQueueCallback` is an alias for a callback function object
/// (functor) that takes as an argument the specified `result`,
/// providing the result and context of the requested operation.
typedef AbstractSession::ConfigureQueueCallback ConfigureQueueCallback;
/// Invoked as a response to an asynchronous close queue operation,
/// `CloseQueueCallback` is an alias for a callback function object
/// (functor) that takes as an argument the specified `result`,
/// providing the result and context of the requested operation.
typedef AbstractSession::CloseQueueCallback CloseQueueCallback;
private:
// CLASS-SCOPE CATEGORY
BALL_LOG_SET_CLASS_CATEGORY("BMQA.SESSION");
private:
// DATA
SessionImpl d_impl; // Sole data member of this object.
private:
// NOT IMPLEMENTED
Session(const Session&) BSLS_KEYWORD_DELETED;
/// Copy constructor and assignment operator are not implemented
Session& operator=(const Session&) BSLS_KEYWORD_DELETED;
public:
// TRAITS
BSLMF_NESTED_TRAIT_DECLARATION(Session, bslma::UsesBslmaAllocator)
// CREATORS
/// Create a new `Session` in *synchronous* mode using the optionally
/// specified `options`. In such mode, events have to be fetched by the
/// application using the `nextEvent()` method. Optionally specify an
/// `allocator` used to supply memory. If `allocator` is 0, the
/// currently installed default allocator is used.
explicit Session(
const bmqt::SessionOptions& options = bmqt::SessionOptions(),
bslma::Allocator* allocator = 0);
/// Create a `Session` in *asynchronous* mode using the specified
/// `eventHandler` as callback for event processing and the optionally
/// specified `options`. Optionally specify an `allocator` used to
/// supply memory. If the optionally specified `allocator` is 0, the
/// currently installed default allocator is used.
explicit Session(
bslma::ManagedPtr<SessionEventHandler> eventHandler,
const bmqt::SessionOptions& options = bmqt::SessionOptions(),
bslma::Allocator* allocator = 0);
/// Stop the `Session` and destroy this object.
~Session() BSLS_KEYWORD_OVERRIDE;
// MANIPULATORS
// (virtual bmqa::AbstractSession)
/// Session management
///------------------
/// Connect to the BlazingMQ broker and start the message processing for
/// this `Session`. This method blocks until either the `Session` is
/// connected to the broker, fails to connect, or the operation times
/// out. If the optionally specified `timeout` is not populated, use
/// the one defined in the session options. Return 0 on success, or a
/// non-zero value corresponding to the `bmqt::GenericResult::Enum` enum
/// values otherwise. The behavior is undefined if this method is
/// called on an already started `Session`.
int start(const bsls::TimeInterval& timeout = bsls::TimeInterval())
BSLS_KEYWORD_OVERRIDE;
/// Connect to the BlazingMQ broker and start the message processing for
/// this `Session`. This method returns without blocking. The result
/// of the operation is communicated with a session event. If the
/// optionally specified `timeout` is not populated, use the one defined
/// in the session options. Return 0 on success (this doesn't imply the
/// session is connected !), or a non-zero value corresponding to the
/// `bmqt::GenericResult::Enum` enum values otherwise. The behavior is
/// undefined if this method is called on an already started `Session`.
int startAsync(const bsls::TimeInterval& timeout = bsls::TimeInterval())
BSLS_KEYWORD_OVERRIDE;
/// Gracefully disconnect from the BlazingMQ broker and stop the
/// operation of this `Session`. This method blocks waiting for all
/// already invoked event handlers to exit and all session-related
/// operations to be finished. No other method but `start()` may be
/// used after this method returns. This method must *NOT* be called if
/// the session is in synchronous mode (i.e., not using the
/// EventHandler), `stopAsync()` should be called in this case.
void stop() BSLS_KEYWORD_OVERRIDE;
/// Disconnect from the BlazingMQ broker and stop the operation of this
/// `Session`. This method returns without blocking and neither enforce
/// nor waits for any already started session-related operation to be
/// finished. No method may be used after this method returns.
void stopAsync() BSLS_KEYWORD_OVERRIDE;
/// **DEPRECATED**
///
/// This method is only to be used if the session is in synchronous mode
/// (i.e., not using the EventHandler): it must be called once all
/// threads getting events with `nextEvent()` have been joined.
void finalizeStop() BSLS_KEYWORD_OVERRIDE;
/// Return a MessageEventBuilder that can be used to build message event
/// for posting on this session. The behavior is undefined unless the
/// session has been successfully started. Note that lifetime of the
/// returned object is bound by the lifetime of this session instance
/// (i.e., returned instance cannot outlive this session instance).
/// Also note that the `MessageEventBuilder` objects are pooled, so this
/// operation is cheap, and `MessageEventBuilder` can be obtained on
/// demand and kept on the stack.
///
/// DEPRECATED: Use the 'loadMessageEventBuilder instead. This
/// method will be marked as `BSLS_ANNOTATION_DEPRECATED` in
/// future release of libbmq.
virtual MessageEventBuilder createMessageEventBuilder();
/// Load into the specified `builder` an instance of
/// `bmqa::MessageEventBuilder` that can be used to build message event
/// for posting on this session. The behavior is undefined unless the
/// session has been successfully started and `builder` is non-null.
/// Note that lifetime of the loaded object is bound by the lifetime of
/// this session instance (i.e., loaded instance cannot outlive this
/// session instance). Also note that the `MessageEventBuilder` objects
/// are pooled, so this operation is cheap, and `MessageEventBuilder`
/// can be obtained on demand and kept on the stack.
void loadMessageEventBuilder(MessageEventBuilder* builder)
BSLS_KEYWORD_OVERRIDE;
/// Load into the specified `builder` an instance of
/// `bmqa::ConfirmEventBuilder` that can be used to build a batch of
/// CONFIRM messages for sending to the broker. The behavior is
/// undefined unless the session has been successfully started and
/// `builder` is non-null. Note that the lifetime of the loaded object
/// is bound by the lifetime of this session instance (i.e., loaded
/// instance cannot outlive this session instance).
void loadConfirmEventBuilder(ConfirmEventBuilder* builder)
BSLS_KEYWORD_OVERRIDE;
/// Load into the specified `buffer` an instance of `MessageProperties`
/// that can be used to specify and associate properties while building
/// a `bmqa::Message`. The behavior is undefined unless the session has
/// been successfully started and `buffer` is non-null. Note that
/// lifetime of the loaded object is bound by the lifetime of this
/// session instance (i.e., loaded instance cannot outlive this session
/// instance).
void
loadMessageProperties(MessageProperties* buffer) BSLS_KEYWORD_OVERRIDE;
/// Queue management
///----------------
/// Load in the specified `queueId` the queue corresponding to the
/// specified `uri` and return 0 if such a queue was found, or leave
/// `queueId` untouched and return a non-zero value if no queue
/// corresponding to `uri` is currently open.
int getQueueId(QueueId* queueId,
const bmqt::Uri& uri) const BSLS_KEYWORD_OVERRIDE;
/// Load in the specified `queueId` the queue corresponding to the
/// specified `correlationId` and return 0 if such a queue was found, or
/// leave `queueId` untouched and return a non-zero value if no queue
/// corresponding to `correlationId` is currently open.
int getQueueId(QueueId* queueId, const bmqt::CorrelationId& correlationId)
const BSLS_KEYWORD_OVERRIDE;
/// DEPRECATED: Use the `openQueueSync(QueueId *queueId...)` instead.
/// This method will be marked as
/// `BSLS_ANNOTATION_DEPRECATED` in future release of
/// libbmq.
int openQueue(QueueId* queueId,
const bmqt::Uri& uri,
bsls::Types::Uint64 flags,
const bmqt::QueueOptions& options = bmqt::QueueOptions(),
const bsls::TimeInterval& timeout = bsls::TimeInterval())
BSLS_KEYWORD_OVERRIDE;
/// Open the queue having the specified `uri` with the specified `flags`
/// (a combination of the values defined in `bmqt::QueueFlags::Enum`),
/// using the specified `queueId` to correlate events related to that
/// queue. The object `queueId` referring to is modified, so the
/// `queueId` represents the actual queue uri, flags and options.
/// Return a result providing the status and context of the operation.
/// Use the optionally specified `options` to configure some advanced
/// settings. Note that this operation fails if `queueId` is
/// non-unique. If the optionally specified `timeout` is not populated,
/// use the one defined in the session options. This operation will
/// block until either success, failure, or timing out happens.
///
/// THREAD: Note that calling this method from the event processing
/// thread(s) (i.e., from the EventHandler callback, if
/// provided) *WILL* lead to a *DEADLOCK*.
OpenQueueStatus
openQueueSync(QueueId* queueId,
const bmqt::Uri& uri,
bsls::Types::Uint64 flags,
const bmqt::QueueOptions& options = bmqt::QueueOptions(),
const bsls::TimeInterval& timeout = bsls::TimeInterval())
BSLS_KEYWORD_OVERRIDE;
/// DEPRECATED: Use the `openQueue(QueueId *queueId...)` instead. This
/// method will be marked as `BSLS_ANNOTATION_DEPRECATED` in
/// future release of libbmq.
virtual int
openQueue(const QueueId& queueId,
const bmqt::Uri& uri,
bsls::Types::Uint64 flags,
const bmqt::QueueOptions& options = bmqt::QueueOptions(),
const bsls::TimeInterval& timeout = bsls::TimeInterval());
/// DEPRECATED: Use the `openQueueAsync(...)` with callback flavor
/// instead. This method will be marked as
/// `BSLS_ANNOTATION_DEPRECATED` in future release of
/// libbmq.
int
openQueueAsync(QueueId* queueId,
const bmqt::Uri& uri,
bsls::Types::Uint64 flags,
const bmqt::QueueOptions& options = bmqt::QueueOptions(),
const bsls::TimeInterval& timeout = bsls::TimeInterval())
BSLS_KEYWORD_OVERRIDE;
/// Asynchronously open the queue having the specified `uri` with the
/// specified `flags` (a combination of the values defined in
/// `bmqt::QueueFlags::Enum`), using the specified `queueId` to
/// correlate events related to that queue and the optionally specified
/// `options` to configure some advanced settings. The object `queueId`
/// referring to is modified, so the `queueId` represents the actual
/// queue uri, flags and options. The result of the operation is
/// communicated to the specified `callback` via a
/// `bmqa::OpenQueueStatus`, providing the status and context of the
/// requested operation. Note that this operation fails if `queueId` is
/// non-unique. If the optionally specified `timeout` is not populated,
/// use the one defined in the session options.
///
/// THREAD: The `callback` will *ALWAYS* be invoked from the
/// EventHandler thread(s) (or if a SessionEventHandler was not
/// specified, from the thread invoking `nextEvent`).
void
openQueueAsync(QueueId* queueId,
const bmqt::Uri& uri,
bsls::Types::Uint64 flags,
const OpenQueueCallback& callback,
const bmqt::QueueOptions& options = bmqt::QueueOptions(),
const bsls::TimeInterval& timeout = bsls::TimeInterval())
BSLS_KEYWORD_OVERRIDE;
/// DEPRECATED: Use the 'configureQueueSync(QueueId *queueId...)
/// instead. This method will be marked as
/// `BSLS_ANNOTATION_DEPRECATED` in future release of
/// libbmq.
int configureQueue(QueueId* queueId,
const bmqt::QueueOptions& options,
const bsls::TimeInterval& timeout =
bsls::TimeInterval()) BSLS_KEYWORD_OVERRIDE;
/// Configure the queue identified by the specified `queueId` using the
/// specified `options` and return a result providing the status and
/// context of the operation. If the optionally specified `timeout` is
/// not populated, use the one defined in the session options. This
/// operation returns error if there is a pending configure for the same
/// queue. This operation will block until either success, failure, or
/// timing out happens.
///
/// Note that the following `bmqt::QueueOptions` fields cannot be
/// reconfigured after the queue has been opened:
/// - suspendsOnBadHostHealth
/// Attempts to reconfigure these fields will yield an `e_NOT_SUPPORTED`
/// error code.
///
/// THREAD: Note that calling this method from the event processing
/// thread(s) (i.e., from the EventHandler callback, if
/// provided) *WILL* lead to a *DEADLOCK*.
ConfigureQueueStatus
configureQueueSync(const QueueId* queueId,
const bmqt::QueueOptions& options,
const bsls::TimeInterval& timeout =
bsls::TimeInterval()) BSLS_KEYWORD_OVERRIDE;
/// DEPRECATED: Use the `configureQueueAsync(...)` with callback flavor
/// instead. This method will be marked as
/// `BSLS_ANNOTATION_DEPRECATED` in future release of
/// libbmq.
int configureQueueAsync(QueueId* queueId,
const bmqt::QueueOptions& options,
const bsls::TimeInterval& timeout =
bsls::TimeInterval()) BSLS_KEYWORD_OVERRIDE;
/// Asynchronously configure the queue identified by the specified
/// `queueId` using the specified `options` to configure some advanced
/// settings. The result of the operation is communicated to the
/// specified `callback` via a `bmqa::ConfigureQueueStatus`, providing
/// the status and context of the requested operation. If the
/// optionally specified `timeout` is not populated, use the one defined
/// in the session options.
///
/// Note that the following `bmqt::QueueOptions` fields cannot be
/// reconfigured after the queue has been opened:
/// - suspendsOnBadHostHealth
/// Attempts to reconfigure these fields will yield an `e_NOT_SUPPORTED`
/// error code.
///
/// THREAD: The `callback` will *ALWAYS* be invoked from the
/// EventHandler thread(s) (or if a SessionEventHandler was not
/// specified, from the thread invoking `nextEvent`).
void configureQueueAsync(const QueueId* queueId,
const bmqt::QueueOptions& options,
const ConfigureQueueCallback& callback,
const bsls::TimeInterval& timeout =
bsls::TimeInterval()) BSLS_KEYWORD_OVERRIDE;
/// DEPRECATED: Use the 'closeQueueSync(QueueId *queueId...) instead.
/// This method will be marked as
/// `BSLS_ANNOTATION_DEPRECATED` in future release of
/// libbmq.
int closeQueue(QueueId* queueId,
const bsls::TimeInterval& timeout = bsls::TimeInterval())
BSLS_KEYWORD_OVERRIDE;
/// Close the queue identified by the specified `queueId` and return a
/// result providing the status and context of the operation. If the
/// optionally specified `timeout` is not populated, use the one defined
/// in the session options. Any outstanding configureQueue request for
/// this `queueId` will be canceled. This operation will block until
/// either success, failure, or timing out happens. Once this method
/// returns, there is guarantee that no more messages and events for
/// this `queueId` will be received. Note that successful processing of
/// this request in the broker closes this session's view of the queue;
/// the underlying queue may not be deleted in the broker. When this
/// method returns, the correlationId associated to the queue is
/// cleared.
///
/// THREAD: Note that calling this method from the event processing
/// thread(s) (i.e., from the EventHandler callback, if
/// provided) *WILL* lead to a *DEADLOCK*.
CloseQueueStatus
closeQueueSync(const QueueId* queueId,
const bsls::TimeInterval& timeout = bsls::TimeInterval())
BSLS_KEYWORD_OVERRIDE;
/// DEPRECATED: Use the 'closeQueue(QueueId *queueId...) instead. This
/// method will be marked as `BSLS_ANNOTATION_DEPRECATED` in
/// future release of libbmq.
virtual int
closeQueue(const QueueId& queueId,
const bsls::TimeInterval& timeout = bsls::TimeInterval());
/// DEPRECATED: Use the `closeQueueAsync(...)` with callback flavor
/// instead. This method will be marked as
/// `BSLS_ANNOTATION_DEPRECATED` in future release of
/// libbmq.
int closeQueueAsync(QueueId* queueId,
const bsls::TimeInterval& timeout =
bsls::TimeInterval()) BSLS_KEYWORD_OVERRIDE;
/// Asynchronously close the queue identified by the specified
/// `queueId`. Any outstanding configureQueue requests will be
/// canceled. The result of the operation is communicated to the
/// specified `callback` via a `bmqa::CloseQueueStatus`, providing the