/
ActiveMQConnectionSupport.h
370 lines (312 loc) · 11.3 KB
/
ActiveMQConnectionSupport.h
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONSUPPORT_H_
#define _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONSUPPORT_H_
#include <cms/ExceptionListener.h>
#include <activemq/util/Config.h>
#include <activemq/exceptions/ActiveMQException.h>
#include <activemq/transport/Transport.h>
#include <activemq/transport/TransportListener.h>
#include <activemq/util/LongSequenceGenerator.h>
#include <decaf/util/Properties.h>
#include <decaf/lang/Exception.h>
#include <decaf/lang/Pointer.h>
#include <memory>
namespace activemq {
namespace core {
using decaf::lang::Pointer;
class AMQCPP_API ActiveMQConnectionSupport :
public transport::TransportListener
{
private:
// Properties used to configure this connection.
Pointer<decaf::util::Properties> properties;
// Transport we are using
Pointer<transport::Transport> transport;
/**
* Boolean indicating that we are to always send message Synchronously.
* This overrides the sending on non-persistent messages or transacted
* messages asynchronously, also fully overrides the useAsyncSend flag.
*/
bool alwaysSyncSend;
/**
* Boolean indicating that we are to send any messages that we would normally
* send synchronously using an asynchronous send. Normally we send all the
* persistent messages not in a transaction synchronously and all others are
* sent asynchronously. Only applied though is alwaysSyncSend is false.
*/
bool useAsyncSend;
/**
* Send Timeout, forces all messages to be sent Synchronously.
*/
unsigned int sendTimeout;
/**
* Close Timeout, time to wait for a Closed message from the broker before
* giving up and just shutting down the connection.
*/
unsigned int closeTimeout;
/**
* Producer Window Size, amount of memory that can be used before the producer
* blocks and waits for ProducerAck messages.
*/
unsigned int producerWindowSize;
/**
* The configured User Name
*/
std::string username;
/**
* The configured Password
*/
std::string password;
/**
* The configured Client Id
*/
std::string clientId;
/**
* Next available Producer Id
*/
util::LongSequenceGenerator producerIds;
/**
* Next available Producer Sequence Id
*/
util::LongSequenceGenerator producerSequenceIds;
/**
* Next available Consumer Id
*/
util::LongSequenceGenerator consumerIds;
/**
* Next available Transaction Id
*/
util::LongSequenceGenerator transactionIds;
/**
* Next available Session Id.
*/
util::LongSequenceGenerator sessionIds;
/**
* Next Temporary Destination Id
*/
util::LongSequenceGenerator tempDestinationIds;
public:
/**
* Creates an instance of the ActiveMQConnectionSupport class, the
* most common properties for a connection are pulled from the
* properties instance or are set to defaults.
*
* @param properties
* The URI configured properties for this connection.
*/
ActiveMQConnectionSupport( const Pointer<transport::Transport>& transport,
const Pointer<decaf::util::Properties>& properties );
virtual ~ActiveMQConnectionSupport();
/**
* Starts the Transport, this should initiate the connection between
* this client and the Transports endpoint.
* @throws Exception
*/
virtual void startupTransport() throw( decaf::lang::Exception );
/**
* Closes this object and deallocates the appropriate resources.
* The object is generally no longer usable after calling close.
* @throws Exception
*/
virtual void shutdownTransport() throw( decaf::lang::Exception );
/**
* Gets the Properties object that this Config object was initialized with.
* @returns a const reference to the Connection Config.
*/
const decaf::util::Properties& getProperties() const {
return *( this->properties.get() );
}
/**
* Gets the Transport Configured for this Connection.
* @return the configured transport
*/
transport::Transport& getTransport() const {
return *( this->transport.get() );
}
/**
* Gets if the Connection should always send things Synchronously.
* @return true if sends should always be Synchronous.
*/
bool isAlwaysSyncSend() const {
return this->alwaysSyncSend;
}
/**
* Sets if the Connection should always send things Synchronously.
* @param value
* true if sends should always be Synchronous.
*/
void setAlwaysSyncSend( bool value ) {
this->alwaysSyncSend = value;
}
/**
* Gets if the useAsyncSend option is set
* @returns true if on false if not.
*/
bool isUseAsyncSend() const {
return this->useAsyncSend;
}
/**
* Sets the useAsyncSend option
* @param value - true to activate, false to disable.
*/
void setUseAsyncSend( bool value ) {
this->useAsyncSend = value;
}
/**
* Gets the assigned send timeout for this Connector
* @return the send timeout configured in the connection uri
*/
unsigned int getSendTimeout() const {
return this->sendTimeout;
}
/**
* Sets the send timeout to use when sending Message objects, this will
* cause all messages to be sent using a Synchronous request is non-zero.
* @param timeout - The time to wait for a response.
*/
void setSendTimeout( unsigned int timeout ) {
this->sendTimeout = timeout;
}
/**
* Gets the assigned close timeout for this Connector
* @return the close timeout configured in the connection uri
*/
unsigned int getCloseTimeout() const {
return this->closeTimeout;
}
/**
* Sets the close timeout to use when sending the disconnect request.
* @param timeout - The time to wait for a close message.
*/
void setCloseTimeout( unsigned int timeout ) {
this->closeTimeout = timeout;
}
/**
* Gets the configured producer window size for Producers that are created
* from this connector. This only applies if there is no send timeout and the
* producer is able to send asynchronously.
* @return size in bytes of messages that this producer can produce before
* it must block and wait for ProducerAck messages to free resources.
*/
unsigned int getProducerWindowSize() const {
return this->producerWindowSize;
}
/**
* Sets the size in Bytes of messages that a producer can send before it is blocked
* to await a ProducerAck from the broker that frees enough memory to allow another
* message to be sent.
* @param windowSize - The size in bytes of the Producers memory window.
*/
void setProducerWindowSize( unsigned int windowSize ) {
this->producerWindowSize = windowSize;
}
/**
* Gets the Configured Username.
* @return the username.
*/
std::string getUsername() const {
return this->username;
}
/**
* Sets the Username.
* @param username - The new username value.
*/
void setUsername( const std::string& username ) {
this->username = username;
}
/**
* Gets the Configured Password.
* @return the password.
*/
std::string getPassword() const {
return this->password;
}
/**
* Sets the Password.
* @param password - The new password value.
*/
void setPassword( const std::string& password ) {
this->password = password;
}
/**
* Gets the Configured Client Id.
* @return the clientId.
*/
std::string getClientId() const {
return this->clientId;
}
/**
* Sets the Client Id.
* @param clientId - The new clientId value.
*/
void setClientId( const std::string& clientId ) {
this->clientId = clientId;
}
/**
* Get the Next available Producer Id
* @return the next id in the sequence.
*/
long long getNextProducerId() {
return this->producerIds.getNextSequenceId();
}
/**
* Get the Next available Producer Sequence Id
* @return the next id in the sequence.
*/
long long getNextProducerSequenceId() {
return this->producerSequenceIds.getNextSequenceId();
}
/**
* Get the Next available Consumer Id
* @return the next id in the sequence.
*/
long long getNextConsumerId() {
return this->consumerIds.getNextSequenceId();
}
/**
* Get the Next available Transaction Id
* @return the next id in the sequence.
*/
long long getNextTransactionId() {
return this->transactionIds.getNextSequenceId();
}
/**
* Get the Next available Session Id.
* @return the next id in the sequence.
*/
long long getNextSessionId() {
return this->sessionIds.getNextSequenceId();
}
/**
* Get the Next Temporary Destination Id
* @return the next id in the sequence.
*/
long long getNextTempDestinationId() {
return this->tempDestinationIds.getNextSequenceId();
}
/**
* The transport has suffered an interruption from which it hopes to recover
*/
virtual void transportInterrupted() {}
/**
* The transport has resumed after an interruption
*/
virtual void transportResumed() {}
};
}}
#endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTIONSUPPORT_H_*/