-
Notifications
You must be signed in to change notification settings - Fork 33
/
NoClusterManager.java
558 lines (502 loc) · 18.9 KB
/
NoClusterManager.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
/*
* Copyright (c) 2000, 2017 Oracle and/or its affiliates. All rights reserved.
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License v. 2.0, which is available at
* http://www.eclipse.org/legal/epl-2.0.
*
* This Source Code may also be made available under the following Secondary
* Licenses when the conditions for such availability set forth in the
* Eclipse Public License v. 2.0 are satisfied: GNU General Public License,
* version 2 with the GNU Classpath Exception, which is available at
* https://www.gnu.org/software/classpath/license.html.
*
* SPDX-License-Identifier: EPL-2.0 OR GPL-2.0 WITH Classpath-exception-2.0
*/
/*
*/
package com.sun.messaging.jmq.jmsserver.cluster.api;
import java.util.*;
import java.net.UnknownHostException;
import java.net.MalformedURLException;
import com.sun.messaging.jmq.io.MQAddress;
import com.sun.messaging.jmq.util.log.*;
import com.sun.messaging.jmq.util.UID;
import com.sun.messaging.jmq.jmsserver.Globals;
import com.sun.messaging.jmq.jmsserver.util.BrokerException;
import org.jvnet.hk2.annotations.Service;
import jakarta.inject.Singleton;
/**
*/
@Service(name = "com.sun.messaging.jmq.jmsserver.cluster.api.NoClusterManager")
@Singleton
public class NoClusterManager implements ClusterManager {
private static boolean DEBUG = false;
private Logger logger = Globals.getLogger();
// private BrokerResources br = Globals.getBrokerResources();
private boolean initialized = false;
/**
* the local broker.
*/
private ClusteredBroker localcb = null;
/**
* The id of the cluster.
*/
private String clusterid = Globals.getClusterID();
/**
* Retrieves the cluster id associated with this cluster.
*
* @return the id or null if this is not an HA cluster
*/
@Override
public String getClusterId() {
return clusterid;
}
/**
* Changes the host/port of the local broker.
*
* @param address MQAddress to the portmapper
* @throws BrokerException if something goes wrong when the address is changed
*/
@Override
public void setMQAddress(MQAddress address) throws Exception {
if (!initialized) {
initialize(address);
return;
}
ClusteredBroker cb = getLocalBroker();
cb.setBrokerURL(address);
}
@Override
public int getClusterPingInterval() {
return CLUSTER_PING_INTERVAL_DEFAULT;
}
/**
* Retrieves the host/port of the local broker.
*
* @return the MQAddress to the portmapper
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public MQAddress getMQAddress() {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
return getLocalBroker().getBrokerURL();
}
/**
* Sets a listener for notification when the state/status or configuration of the cluster changes.
*
* <p>
* This api is used by the Monitor Service to determine when a broker should be monitored because it may be down.
*
* @see ClusterListener
* @param listener the listener to add
*/
@Override
public void addEventListener(ClusterListener listener) {
}
/**
* Removes a listener for notification when the state changes.
*
* <p>
* This api is used by the Monitor Service to determine when a broker should be monitored because it may be down.
*
* @return true if the item existed and was removed.
* @see ClusterListener
* @param listener the listener to remove
*/
@Override
public boolean removeEventListener(ClusterListener listener) {
return true;
}
/**
* Retrieves the ClusteredBroker which represents this broker.
*
* @return the local broker
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
* @see ClusterManagerImpl#getBroker(String)
*/
@Override
public ClusteredBroker getLocalBroker() {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
return localcb;
}
/**
* Returns the current number of brokers in the cluster. In a non-ha cluster, this includes all brokers which have a
* BrokerLink to the local broker and the local broker.
*
* @return count of all brokers in the cluster.
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public int getKnownBrokerCount() {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
return 1;
}
/**
* Returns the current number of brokers in the configuration propperties. In a non-ha cluster, this includes all
* brokers listed by -cluster or the cluster property.
*
* @return count of configured brokers in the cluster.
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public int getConfigBrokerCount() {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
return 1;
}
/**
* Returns the current number of brokers in the cluster. In a non-ha cluster, this includes all brokers which have an
* active BrokerLink to the local broker and the local broker.
*
* @return count of all brokers in the cluster.
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public int getActiveBrokerCount() {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
return 1;
}
/**
* Returns an iterator of ClusteredBroker objects for all brokers in the cluster. This is a copy of the current list.
*
* @param refresh if true refresh current list then return it
* @return iterator of ClusteredBrokers
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public Iterator getKnownBrokers(boolean refresh) {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
List l = new ArrayList();
l.add(localcb);
return l.iterator();
}
/**
* Returns an iterator of ClusteredBroker objects for all brokers in the cluster. This is a copy of the current list and
* is accurate at the time getBrokers was called.
*
* @return iterator of ClusteredBrokers
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public Iterator getConfigBrokers() {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
List l = new ArrayList();
l.add(localcb);
return l.iterator();
}
/**
* Returns an iterator of ClusteredBroker objects for all active brokers in the cluster. This is a copy of the current
* list and is accurate at the time getBrokers was called.
*
* @return iterator of ClusteredBrokers
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public Iterator getActiveBrokers() {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
List l = new ArrayList();
l.add(localcb);
return l.iterator();
}
/**
* Returns a specific ClusteredBroker object by name.
*
* @param brokerid the id associated with the broker
* @return the broker associated with brokerid or null if the broker is not found
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public ClusteredBroker getBroker(String brokerid) {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
if (localcb.getBrokerName().equals(brokerid)) {
return localcb;
}
return null;
}
/**
* Method used in a dynamic cluster, it updates the system when a new broker is added.
*
* @param URL the MQAddress of the new broker
* @param uid the brokerSessionUID associated with this broker (if known)
* @param instName the instance name of the broker to be activated
* @param userData optional data associated with the status change
* @throws NoSuchElementException if the broker can not be added to the cluster (for example if the cluster is running
* in HA mode and the URL is not in the shared database)
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
* @return the uid associated with the new broker
*/
@Override
public String activateBroker(MQAddress URL, UID uid, String instName, Object userData) throws BrokerException {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".activateBroker(" + URL + " ...)");
}
/**
* method used in a all clusters, it updates the system when a new broker is added.
*
* @param brokerid the id of the broker (if known)
* @param uid the broker sessionUID
* @param instName the broker instance name
* @param userData optional data associated with the status change
* @throws NoSuchElementException if the broker can not be added to the cluster (for example if the cluster is running
* in HA mode and the brokerid is not in the shared database)
* @throws BrokerException if the database can not be accessed
* @return the uid associated with the new broker
*/
@Override
public String activateBroker(String brokerid, UID uid, String instName, Object userData) throws BrokerException {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".activateBroker(" + brokerid + " ...)");
}
/**
* method used in a dynamic cluster, it updates the system when a broker is removed.
*
* @param URL the MQAddress associated with the broker
* @param userData optional data associated with the status change
* @throws NoSuchElementException if the broker can not be found in the cluster.
*/
@Override
public void deactivateBroker(MQAddress URL, Object userData) {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".deactivateBroker(" + URL + " ...)");
}
/**
* Method used in a dynamic cluster, it updates the system when a broker is removed.
*
* @param brokerid the id associated with the broker
* @param userData optional data associated with the status change
* @throws NoSuchElementException if the broker can not be found in the cluster.
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public void deactivateBroker(String brokerid, Object userData) {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".deactivateBroker(" + brokerid + " ..)");
}
/**
* Finds the brokerid associated with the given host/port.
*
* @param broker the MQAddress of the new broker
* @return the id associated with the broker or null if the broker does not exist
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public String lookupBrokerID(MQAddress broker) {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
MQAddress addr = localcb.getBrokerURL();
if (addr.equals(broker)) {
return localcb.getBrokerName();
}
return null;
}
/**
* finds the brokerid associated with the given session.
*
* @param uid is the session uid to search for
* @return the uid associated with the session or null we cant find it.
*/
@Override
public String lookupStoreSessionOwner(UID uid) {
return null;
}
@Override
public String getStoreSessionCreator(UID uid) {
return null;
}
/**
* finds the brokerid associated with the given session.
*
* @param uid is the session uid to search for
* @return the uid associated with the session or null we cant find it.
*/
@Override
public String lookupBrokerSessionUID(UID uid) {
if (!initialized) {
throw new RuntimeException("Cluster not initialized");
}
UID buid = localcb.getBrokerSessionUID();
if (buid.equals(uid)) {
return localcb.getBrokerName();
}
return null;
}
/**
* @return true if allow configured master broker
*/
protected boolean allowMasterBroker() {
return false;
}
/**
* The master broker in the cluster (if any).
*
* @return the master broker (or null if none)
* @see ClusterManagerImpl#getBroker(String)
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public ClusteredBroker getMasterBroker() {
return null;
}
/**
* The transport (as a string) used by the cluster of brokers.
*
* @return the transport (tcp, ssl)
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public String getTransport() {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".getTransport()");
}
/**
* Returns the port configured for the cluster service.
*
* @return the port (or 0 if a dynamic port should be used)
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public int getClusterPort() {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".getClusterPort()");
}
/**
* Returns the host that the cluster service should bind to .
*
* @return the hostname (or null if the service should bind to all)
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public String getClusterHost() {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".getClusterHost()");
}
/**
* Is the cluster "highly available" ?
*
* @return true if the cluster is HA
* @throws RuntimeException if the cluster has not be initialized (which occurs the first time the MQAddress is set)
* @see ClusterManagerImpl#setMQAddress
* @see Globals#getHAEnabled()
*/
@Override
public boolean isHA() {
return false;
}
/**
* Reload cluster properties from config
*
*/
@Override
public void reloadConfig() throws BrokerException {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".reloadCluster()");
}
/**
* Initializes the cluster (loading all configuration). This methods is called the first time setMQAddress is called
* after the broker is created.
*
* @param address the address of the local broker
* @throws BrokerException if the cluster can not be initialized
* @see ClusterManagerImpl#setMQAddress
*/
@Override
public String initialize(MQAddress address) throws BrokerException {
localcb = new NoClusteredBroker(address, new UID());
localcb.setStatus(BrokerStatus.ACTIVATE_BROKER, null);
if (DEBUG) {
logger.log(Logger.INFO, "ClusterManager: " + toString());
}
initialized = true;
return localcb.getBrokerName();
}
/**
* Returns a user-readable string representing this class.
*
* @return the user-readable represeation.
*/
@Override
public String toString() {
return "NoClusterManager: [local=" + localcb + "]";
}
/**
* Gets the UID associated with the local broker
*
* @return null (this cluster type does not support session)
*/
@Override
public synchronized UID getStoreSessionUID() {
return null;
}
/**
* Gets the UID associated with the local broker
*
* @return null (this cluster type does not support session)
*/
@Override
public synchronized UID getBrokerSessionUID() {
return getLocalBroker().getBrokerSessionUID();
}
/**
*/
protected void addSupportedStoreSessionUID(UID uid) {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".addSupportedStoreSessionUID()");
}
/**
*/
@Override
public Set getSupportedStoreSessionUIDs() {
return new HashSet();
}
@Override
public MQAddress getBrokerNextToMe() {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".getBrokerNextToMe()");
}
@Override
public LinkedHashSet parseBrokerList(String values) throws MalformedURLException, UnknownHostException {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".parseBrokerList()");
}
@Override
public ClusteredBroker getBrokerByNodeName(String nodeName) throws BrokerException {
throw new UnsupportedOperationException("Unexpected call: " + getClass().getName() + ".getbrokerByNodeName()");
}
/**
* @param partitionID the partition id
*/
@Override
public void partitionAdded(UID partitionID, Object source) {
}
/**
* @param partitionID the partition id
*/
@Override
public void partitionRemoved(UID partitionID, Object source, Object destinedTo) {
}
}