-
Notifications
You must be signed in to change notification settings - Fork 471
/
TP.java
2943 lines (2383 loc) · 119 KB
/
TP.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package org.jgroups.protocols;
import org.jgroups.*;
import org.jgroups.annotations.*;
import org.jgroups.blocks.LazyRemovalCache;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.conf.PropertyConverters;
import org.jgroups.logging.LogFactory;
import org.jgroups.stack.DiagnosticsHandler;
import org.jgroups.stack.Protocol;
import org.jgroups.util.*;
import org.jgroups.util.ThreadFactory;
import org.jgroups.util.UUID;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.InterruptedIOException;
import java.lang.management.ManagementFactory;
import java.net.InetAddress;
import java.net.NetworkInterface;
import java.net.SocketException;
import java.text.NumberFormat;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
/**
* Generic transport - specific implementations should extend this abstract class.
* Features which are provided to the subclasses include
* <ul>
* <li>version checking
* <li>marshalling and unmarshalling
* <li>message bundling (handling single messages, and message lists)
* <li>incoming packet handler
* </ul>
* A subclass has to override
* <ul>
* <li>{@link #sendMulticast(org.jgroups.util.AsciiString, byte[], int, int)}
* <li>{@link #sendUnicast(org.jgroups.PhysicalAddress, byte[], int, int)}
* <li>{@link #init()}
* <li>{@link #start()}: subclasses <em>must</em> call super.start() <em>after</em> they initialize themselves
* (e.g., created their sockets).
* <li>{@link #stop()}: subclasses <em>must</em> call super.stop() after they deinitialized themselves
* <li>{@link #destroy()}
* </ul>
* The create() or start() method has to create a local address.<br>
* The {@link #receive(Address, byte[], int, int)} method must
* be called by subclasses when a unicast or multicast message has been received.
* @author Bela Ban
*/
@MBean(description="Transport protocol")
public abstract class TP extends Protocol implements DiagnosticsHandler.ProbeHandler {
protected static final byte LIST=1; // we have a list of messages rather than a single message when set
protected static final byte MULTICAST=2; // message is a multicast (versus a unicast) message when set
protected static final int MSG_OVERHEAD=Global.SHORT_SIZE + Global.BYTE_SIZE; // version + flags
protected static final boolean can_bind_to_mcast_addr;
protected static final String BUNDLE_MSG="%s: sending %d msgs (%d bytes (%.2f%% of max_bundle_size) to %d dests(s): %s";
protected static final long MIN_WAIT_BETWEEN_DISCOVERIES=TimeUnit.NANOSECONDS.convert(10, TimeUnit.SECONDS); // ns
protected static NumberFormat f;
static {
can_bind_to_mcast_addr=(Util.checkForLinux() && !Util.checkForAndroid())
|| Util.checkForSolaris()
|| Util.checkForHp()
|| Util.checkForMac();
f=NumberFormat.getNumberInstance();
f.setGroupingUsed(false);
f.setMaximumFractionDigits(2);
}
/* ------------------------------------------ JMX and Properties ------------------------------------------ */
@LocalAddress
@Property(name="bind_addr",
description="The bind address which should be used by this transport. The following special values " +
"are also recognized: GLOBAL, SITE_LOCAL, LINK_LOCAL, NON_LOOPBACK, match-interface, match-host, match-address",
defaultValueIPv4=Global.NON_LOOPBACK_ADDRESS, defaultValueIPv6=Global.NON_LOOPBACK_ADDRESS,
systemProperty={Global.BIND_ADDR},writable=false)
protected InetAddress bind_addr=null;
@Property(description="Use \"external_addr\" if you have hosts on different networks, behind " +
"firewalls. On each firewall, set up a port forwarding rule (sometimes called \"virtual server\") to " +
"the local IP (e.g. 192.168.1.100) of the host then on each host, set \"external_addr\" TCP transport " +
"parameter to the external (public IP) address of the firewall.",
systemProperty=Global.EXTERNAL_ADDR,writable=false)
protected InetAddress external_addr=null;
@Property(description="Used to map the internal port (bind_port) to an external port. Only used if > 0",
systemProperty=Global.EXTERNAL_PORT,writable=false)
protected int external_port;
@Property(name="bind_interface", converter=PropertyConverters.BindInterface.class,
description="The interface (NIC) which should be used by this transport", dependsUpon="bind_addr",
exposeAsManagedAttribute=false)
protected String bind_interface_str;
@Property(description="If true, the transport should use all available interfaces to receive multicast messages")
protected boolean receive_on_all_interfaces=false;
/**
* List<NetworkInterface> of interfaces to receive multicasts on. The multicast receive socket will listen
* on all of these interfaces. This is a comma-separated list of IP addresses or interface names. E.g.
* "192.168.5.1,eth1,127.0.0.1". Duplicates are discarded; we only bind to
* an interface once. If this property is set, it overrides receive_on_all_interfaces.
*/
@Property(converter=PropertyConverters.NetworkInterfaceList.class,
description="Comma delimited list of interfaces (IP addresses or interface names) to receive multicasts on")
protected List<NetworkInterface> receive_interfaces=null;
@Property(description="Max number of elements in the logical address cache before eviction starts")
protected int logical_addr_cache_max_size=2000;
@Property(description="Time (in ms) after which entries in the logical address cache marked as removable " +
"can be removed. 0 never removes any entries (not recommended)")
protected long logical_addr_cache_expiration=120000;
@Property(description="Interval (in ms) at which the reaper task scans logical_addr_cache and removes entries " +
"marked as removable. 0 disables reaping.")
protected long logical_addr_cache_reaper_interval=60000;
/** The port to which the transport binds. 0 means to bind to any (ephemeral) port. See also {@link #port_range} */
@Property(description="The port to which the transport binds. Default of 0 binds to any (ephemeral) port. See also port_range",writable=false)
protected int bind_port;
@Property(description="The range of valid ports, from bind_port to end_port. 0 only binds to bind_port and fails if taken")
protected int port_range=50; // 27-6-2003 bgooren, Only try one port by default
/** If true, messages sent to self are treated specially: unicast messages are looped back immediately,
* multicast messages get a local copy first and - when the real copy arrives - it will be discarded */
@Property(description="Messages to self are looped back immediately if true",deprecatedMessage="enabled by default")
@Deprecated
protected boolean loopback=true;
@Property(description="Whether or not to make a copy of a message before looping it back up. Don't use this; might " +
"get removed without warning")
protected boolean loopback_copy=false;
@Property(description="Loop back the message on a separate thread or use the current thread. Don't use this; " +
"might get removed without warning")
protected boolean loopback_separate_thread=true;
/**
* Discard packets with a different version. Usually minor version differences are okay. Setting this property
* to true means that we expect the exact same version on all incoming packets
*/
@Deprecated
@Property(description="Discard packets with a different version if true",
deprecatedMessage="incompatible packets are discarded anyway",writable=false)
protected boolean discard_incompatible_packets=true;
@Property(description="Thread naming pattern for threads in this channel. Valid values are \"pcl\": " +
"\"p\": includes the thread name, e.g. \"Incoming thread-1\", \"UDP ucast receiver\", " +
"\"c\": includes the cluster name, e.g. \"MyCluster\", " +
"\"l\": includes the local address of the current member, e.g. \"192.168.5.1:5678\"")
protected String thread_naming_pattern="cl";
@Property(name="oob_thread_pool.enabled",description="Switch for enabling thread pool for OOB messages. " +
"Default=true",writable=false)
@Deprecated
protected boolean oob_thread_pool_enabled=true;
@Property(name="oob_thread_pool.min_threads",description="Minimum thread pool size for the OOB thread pool")
protected int oob_thread_pool_min_threads=2;
@Property(name="oob_thread_pool.max_threads",description="Max thread pool size for the OOB thread pool")
protected int oob_thread_pool_max_threads=10;
@Property(name="oob_thread_pool.keep_alive_time", description="Timeout in ms to remove idle threads from the OOB pool")
protected long oob_thread_pool_keep_alive_time=30000;
@Property(name="oob_thread_pool.queue_enabled", description="Use queue to enqueue incoming OOB messages")
protected boolean oob_thread_pool_queue_enabled=false;
@Property(name="oob_thread_pool.queue_max_size",description="Maximum queue size for incoming OOB messages")
protected int oob_thread_pool_queue_max_size=500;
@Property(name="oob_thread_pool.rejection_policy",
description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
protected String oob_thread_pool_rejection_policy="abort";
@Property(name="thread_pool.min_threads",description="Minimum thread pool size for the regular thread pool")
protected int thread_pool_min_threads=2;
@Property(name="thread_pool.max_threads",description="Maximum thread pool size for the regular thread pool")
protected int thread_pool_max_threads=10;
@Property(name="thread_pool.keep_alive_time",description="Timeout in milliseconds to remove idle thread from regular pool")
protected long thread_pool_keep_alive_time=30000;
@Property(name="thread_pool.enabled",description="Switch for enabling thread pool for regular messages")
@Deprecated
protected boolean thread_pool_enabled=true;
@Property(name="thread_pool.queue_enabled", description="Queue to enqueue incoming regular messages")
protected boolean thread_pool_queue_enabled=true;
@Property(name="thread_pool.queue_max_size", description="Maximum queue size for incoming regular messages")
protected int thread_pool_queue_max_size=10000;
@Property(name="thread_pool.rejection_policy",
description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
protected String thread_pool_rejection_policy="abort";
@Property(name="internal_thread_pool.enabled",description="Switch for enabling thread pool for internal messages",
writable=false)
@Deprecated
protected boolean internal_thread_pool_enabled=true;
@Property(name="internal_thread_pool.min_threads",description="Minimum thread pool size for the internal thread pool")
protected int internal_thread_pool_min_threads=2;
@Property(name="internal_thread_pool.max_threads",description="Maximum thread pool size for the internal thread pool")
protected int internal_thread_pool_max_threads=4;
@Property(name="internal_thread_pool.keep_alive_time", description="Timeout in ms to remove idle threads from the internal pool")
protected long internal_thread_pool_keep_alive_time=30000;
@Property(name="internal_thread_pool.queue_enabled", description="Queue to enqueue incoming internal messages")
protected boolean internal_thread_pool_queue_enabled=true;
@Property(name="internal_thread_pool.queue_max_size",description="Maximum queue size for incoming internal messages")
protected int internal_thread_pool_queue_max_size=500;
@Property(name="internal_thread_pool.rejection_policy",
description="Thread rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
protected String internal_thread_pool_rejection_policy="abort";
@Property(description="Type of timer to be used. Valid values are \"old\" (DefaultTimeScheduler, used up to 2.10), " +
"\"new\" or \"new2\" (TimeScheduler2), \"new3\" (TimeScheduler3) and \"wheel\". Note that this property " +
"might disappear in future releases, if one of the 3 timers is chosen as default timer")
protected String timer_type="new3";
@Property(name="timer.min_threads",description="Minimum thread pool size for the timer thread pool")
protected int timer_min_threads=2;
@Property(name="timer.max_threads",description="Max thread pool size for the timer thread pool")
protected int timer_max_threads=4;
@Property(name="timer.keep_alive_time", description="Timeout in ms to remove idle threads from the timer pool")
protected long timer_keep_alive_time=5000;
@Property(name="timer.queue_max_size", description="Max number of elements on a timer queue")
protected int timer_queue_max_size=500;
@Property(name="timer.rejection_policy",description="Timer rejection policy. Possible values are Abort, Discard, DiscardOldest and Run")
protected String timer_rejection_policy="abort"; // abort will spawn a new thread if the timer thread pool is full
// hashed timing wheel specific props
@Property(name="timer.wheel_size",
description="Number of ticks in the HashedTimingWheel timer. Only applicable if timer_type is \"wheel\"")
protected int wheel_size=200;
@Property(name="timer.tick_time",
description="Tick duration in the HashedTimingWheel timer. Only applicable if timer_type is \"wheel\"")
protected long tick_time=50L;
@Property(description="Interval (in ms) at which the time service updates its timestamp. 0 disables the time service")
protected long time_service_interval=500;
@Property(description="Enable bundling of smaller messages into bigger ones. Default is true",
deprecatedMessage="will be ignored as bundling is on by default")
@Deprecated
protected boolean enable_bundling=true;
/** Enable bundling for unicast messages. Ignored if enable_bundling is off */
@Property(description="Enable bundling of smaller messages into bigger ones for unicast messages. Default is true",
deprecatedMessage="will be ignored")
@Deprecated
protected boolean enable_unicast_bundling=true;
@Property(description="Allows the transport to pass received message batches up as MessagesBatch instances " +
"(up(MessageBatch)), rather than individual messages. This flag will be removed in a future version " +
"when batching has been implemented by all protocols")
protected boolean enable_batching=true;
@Property(description="Whether or not messages with DONT_BUNDLE set should be ignored by default (JGRP-1737). " +
"This property will be removed in a future release, so don't use it")
protected boolean ignore_dont_bundle=true;
@Property(description="Switch to enable diagnostic probing. Default is true")
protected boolean enable_diagnostics=true;
@Property(description="Address for diagnostic probing. Default is 224.0.75.75",
defaultValueIPv4="224.0.75.75",defaultValueIPv6="ff0e::0:75:75")
protected InetAddress diagnostics_addr=null;
@Property(converter=PropertyConverters.NetworkInterfaceList.class,
description="Comma delimited list of interfaces (IP addresses or interface names) that the " +
"diagnostics multicast socket should bind to")
protected List<NetworkInterface> diagnostics_bind_interfaces=null;
@Property(description="Port for diagnostic probing. Default is 7500")
protected int diagnostics_port=7500;
@Property(description="TTL of the diagnostics multicast socket")
protected int diagnostics_ttl=8;
@Property(description="Authorization passcode for diagnostics. If specified every probe query will be authorized")
protected String diagnostics_passcode;
@Property(description="If assigned enable this transport to be a singleton (shared) transport",
deprecatedMessage="Use fork channels instead")
@Deprecated
/**
* @deprecated Will be removed in 4.0. Use fork channels instead
*/
protected String singleton_name;
/** Whether or not warnings about messages from different groups are logged - private flag, not for common use */
@Property(description="whether or not warnings about messages from different groups are logged")
protected boolean log_discard_msgs=true;
@Property(description="whether or not warnings about messages from members with a different version are discarded")
protected boolean log_discard_msgs_version=true;
@Property(description="Timeout (in ms) to determine how long to wait until a request to fetch the physical address " +
"for a given logical address will be sent again. Subsequent requests for the same physical address will therefore " +
"be spaced at least who_has_cache_timeout ms apart")
protected long who_has_cache_timeout=2000;
@Property(description="Max number of attempts to fetch a physical address (when not in the cache) before giving up",
deprecatedMessage="will be ignored")
protected int physical_addr_max_fetch_attempts=1;
@Property(description="Time during which identical warnings about messages from a member with a different version " +
"will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also " +
"disables this.")
protected long suppress_time_different_version_warnings=60000;
@Property(description="Time during which identical warnings about messages from a member from a different cluster " +
"will be suppressed. 0 disables this (every warning will be logged). Setting the log level to ERROR also " +
"disables this.")
protected long suppress_time_different_cluster_warnings=60000;
/**
* Maximum number of bytes for messages to be queued until they are sent.
* This value needs to be smaller than the largest datagram packet size in case of UDP
*/
@Property(name="max_bundle_size", description="Maximum number of bytes for messages to be queued until they are sent")
protected int max_bundle_size=64000;
/**
* Max number of milliseconds until queued messages are sent. Messages are sent when max_bundle_size
* or max_bundle_timeout has been exceeded (whichever occurs faster)
*/
@Property(name="max_bundle_timeout", description="Max number of milliseconds until queued messages are sent")
protected long max_bundle_timeout=20;
@Property(description="The type of bundler used. Has to be \"sender-sends-with-timer\", \"transfer-queue\" (default) " +
"or \"sender-sends\"")
protected String bundler_type="transfer-queue";
@Property(description="The max number of elements in a bundler if the bundler supports size limitations")
protected int bundler_capacity=20000;
public void setMaxBundleSize(int size) {
if(size <= 0)
throw new IllegalArgumentException("max_bundle_size (" + size + ") is <= 0");
max_bundle_size=size;
}
public long getMaxBundleTimeout() {return max_bundle_timeout;}
public void setMaxBundleTimeout(long timeout) {
if(timeout <= 0)
throw new IllegalArgumentException("max_bundle_timeout of " + timeout + " is invalid");
max_bundle_timeout=timeout;
}
public int getMaxBundleSize() {return max_bundle_size;}
@ManagedAttribute public int getBundlerBufferSize() {
if(bundler instanceof TransferQueueBundler)
return ((TransferQueueBundler)bundler).getBufferSize();
return 0;
}
@ManagedAttribute(description="Is the logical_addr_cache reaper task running")
public boolean isLogicalAddressCacheReaperRunning() {
return logical_addr_cache_reaper != null && !logical_addr_cache_reaper.isDone();
}
@ManagedAttribute(description="Returns the average batch size of received batches")
public double getAvgBatchSize() {
return avg_batch_size.getAverage();
}
public void setOOBThreadPoolKeepAliveTime(long time) {
oob_thread_pool_keep_alive_time=time;
if(oob_thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)oob_thread_pool).setKeepAliveTime(time, TimeUnit.MILLISECONDS);
}
public long getOOBThreadPoolKeepAliveTime() {return oob_thread_pool_keep_alive_time;}
public void setOOBThreadPoolMinThreads(int size) {
oob_thread_pool_min_threads=size;
if(oob_thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)oob_thread_pool).setCorePoolSize(size);
}
public int getOOBThreadPoolMinThreads() {return oob_thread_pool_min_threads;}
public void setOOBThreadPoolMaxThreads(int size) {
oob_thread_pool_max_threads=size;
if(oob_thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)oob_thread_pool).setMaximumPoolSize(size);
}
public int getOOBThreadPoolMaxThreads() {return oob_thread_pool_max_threads;}
public void setOOBThreadPoolQueueEnabled(boolean flag) {this.oob_thread_pool_queue_enabled=flag;}
public void setThreadPoolMinThreads(int size) {
thread_pool_min_threads=size;
if(thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)thread_pool).setCorePoolSize(size);
}
public int getThreadPoolMinThreads() {return thread_pool_min_threads;}
public void setThreadPoolMaxThreads(int size) {
thread_pool_max_threads=size;
if(thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)thread_pool).setMaximumPoolSize(size);
}
public int getThreadPoolMaxThreads() {return thread_pool_max_threads;}
public void setThreadPoolKeepAliveTime(long time) {
thread_pool_keep_alive_time=time;
if(thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)thread_pool).setKeepAliveTime(time, TimeUnit.MILLISECONDS);
}
public long getThreadPoolKeepAliveTime() {return thread_pool_keep_alive_time;}
public void setTimerMinThreads(int size) {
timer_min_threads=size;
if(timer != null)
timer.setMinThreads(size);
}
public int getTimerMinThreads() {return timer_min_threads;}
public void setTimerMaxThreads(int size) {
timer_max_threads=size;
if(timer != null)
timer.setMaxThreads(size);
}
public int getTimerMaxThreads() {return timer_max_threads;}
public void setTimerKeepAliveTime(long time) {
timer_keep_alive_time=time;
if(timer != null)
timer.setKeepAliveTime(time);
}
public long getTimerKeepAliveTime() {return timer_keep_alive_time;}
@ManagedAttribute
public int getTimerQueueSize() {
if(timer instanceof TimeScheduler2)
return ((TimeScheduler2)timer).getQueueSize();
return 0;
}
/* --------------------------------------------- JMX ---------------------------------------------- */
@ManagedAttribute(description="Number of messages sent")
protected long num_msgs_sent;
@ManagedAttribute(description="Number of messages received")
protected long num_msgs_received;
@ManagedAttribute(description="Number of single messages received")
protected long num_single_msgs_received;
@ManagedAttribute(description="Number of single messages sent")
protected long num_single_msgs_sent;
@ManagedAttribute(description="Number of single messages that were sent instead of sending a batch of 1")
protected long num_single_msgs_sent_instead_of_batch;
@ManagedAttribute(description="Number of message batches received")
protected long num_batches_received;
@ManagedAttribute(description="Number of message batches sent")
protected long num_batches_sent;
@ManagedAttribute(description="Number of bytes sent")
protected long num_bytes_sent;
@ManagedAttribute(description="Number of bytes received")
protected long num_bytes_received;
@ManagedAttribute(description="Number of messages rejected by the thread pool")
protected int num_rejected_msgs;
/** The name of the group to which this member is connected. With a shared transport, the channel name is
* in TP.ProtocolAdapter (cluster_name), and this field is not used */
@ManagedAttribute(description="Channel (cluster) name")
protected AsciiString cluster_name;
@ManagedAttribute(description="Number of OOB messages received")
protected long num_oob_msgs_received;
@ManagedAttribute(description="Number of regular messages received")
protected long num_incoming_msgs_received;
@ManagedAttribute(description="Number of internal messages received")
protected long num_internal_msgs_received;
@ManagedAttribute(description="Class of the timer implementation")
public String getTimerClass() {
return timer != null? timer.getClass().getSimpleName() : "null";
}
@ManagedAttribute(description="Name of the cluster to which this transport is connected")
public String getClusterName() {
return cluster_name != null? cluster_name.toString() : null;
}
@ManagedAttribute(description="Number of messages from members in a different cluster")
public int getDifferentClusterMessages() {
return suppress_log_different_cluster != null? suppress_log_different_cluster.getCache().size() : 0;
}
@ManagedAttribute(description="Number of messages from members with a different JGroups version")
public int getDifferentVersionMessages() {
return suppress_log_different_version != null? suppress_log_different_version.getCache().size() : 0;
}
@ManagedOperation(description="Clears the cache for messages from different clusters")
public void clearDifferentClusterCache() {
if(suppress_log_different_cluster != null)
suppress_log_different_cluster.getCache().clear();
}
@ManagedOperation(description="Clears the cache for messages from members with different versions")
public void clearDifferentVersionCache() {
if(suppress_log_different_version != null)
suppress_log_different_version.getCache().clear();
}
@ManagedAttribute(description="Type of logger used")
public static String loggerType() {return LogFactory.loggerType();}
/* --------------------------------------------- Fields ------------------------------------------------------ */
/** The address (host and port) of this member. Null by default when a shared transport is used */
protected Address local_addr;
protected PhysicalAddress local_physical_addr;
/** The members of this group (updated when a member joins or leaves). With a shared transport,
* members contains *all* members from all channels sitting on the shared transport */
protected final Set<Address> members=new CopyOnWriteArraySet<>();
/** Keeps track of connects and disconnects, in order to start and stop threads */
protected int connect_count;
//http://jira.jboss.org/jira/browse/JGRP-849
protected final ReentrantLock connectLock = new ReentrantLock();
// ================================== OOB thread pool ========================
protected Executor oob_thread_pool;
/** Factory which is used by oob_thread_pool */
protected ThreadFactory oob_thread_factory;
/** Used if oob_thread_pool is a ThreadPoolExecutor and oob_thread_pool_queue_enabled is true */
protected BlockingQueue<Runnable> oob_thread_pool_queue;
// ================================== Regular thread pool ======================
/** The thread pool which handles unmarshalling, version checks and dispatching of regular messages */
protected Executor thread_pool;
/** Factory which is used by oob_thread_pool */
protected ThreadFactory default_thread_factory;
/** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */
protected BlockingQueue<Runnable> thread_pool_queue;
// ================================== Internal thread pool ======================
/** The thread pool which handles JGroups internal messages (Flag.INTERNAL) */
protected Executor internal_thread_pool;
/** Factory which is used by internal_thread_pool */
protected ThreadFactory internal_thread_factory;
/** Used if thread_pool is a ThreadPoolExecutor and thread_pool_queue_enabled is true */
protected BlockingQueue<Runnable> internal_thread_pool_queue;
// ================================== Timer thread pool =========================
protected TimeScheduler timer;
protected ThreadFactory timer_thread_factory;
protected TimeService time_service;
// ================================ Default thread factory ========================
/** Used by all threads created by JGroups outside of the thread pools */
protected ThreadFactory global_thread_factory=null;
// ================================= Default SocketFactory ========================
protected SocketFactory socket_factory=new DefaultSocketFactory();
protected Bundler bundler;
protected DiagnosticsHandler diag_handler;
protected final List<DiagnosticsHandler.ProbeHandler> preregistered_probe_handlers=new LinkedList<>();
/**
* If singleton_name is enabled, this map is used to de-multiplex incoming messages according to their cluster
* names (attached to the message by the transport anyway). The values are the next protocols above the
* transports.
*/
protected final ConcurrentMap<AsciiString,Protocol> up_prots=Util.createConcurrentMap(16, 0.75f, 16);
/** The header including the cluster name, sent with each message. Not used with a shared transport (instead
* TP.ProtocolAdapter attaches the header to the message */
protected TpHeader header;
/**
* Cache which maintains mappings between logical and physical addresses. When sending a message to a logical
* address, we look up the physical address from logical_addr_cache and send the message to the physical address
* <br/>
* The keys are logical addresses, the values physical addresses
*/
protected LazyRemovalCache<Address,PhysicalAddress> logical_addr_cache;
// last time (in ns) we sent a discovery request
protected long last_discovery_request;
Future<?> logical_addr_cache_reaper;
protected final Average avg_batch_size=new Average(20);
protected static final LazyRemovalCache.Printable<Address,LazyRemovalCache.Entry<PhysicalAddress>> print_function
=new LazyRemovalCache.Printable<Address,LazyRemovalCache.Entry<PhysicalAddress>>() {
public String print(final Address logical_addr, final LazyRemovalCache.Entry<PhysicalAddress> entry) {
StringBuilder sb=new StringBuilder();
String tmp_logical_name=UUID.get(logical_addr);
if(tmp_logical_name != null)
sb.append(tmp_logical_name).append(": ");
if(logical_addr instanceof UUID)
sb.append(((UUID)logical_addr).toStringLong());
else
sb.append(logical_addr);
sb.append(": ").append(entry).append("\n");
return sb.toString();
}
};
/** Cache keeping track of WHO_HAS requests for physical addresses (given a logical address) and expiring
* them after who_has_cache_timeout ms */
protected ExpiryCache<Address> who_has_cache;
/** Log to suppress identical warnings for messages from members with different (incompatible) versions */
protected SuppressLog<Address> suppress_log_different_version;
/** Log to suppress identical warnings for messages from members in different clusters */
protected SuppressLog<Address> suppress_log_different_cluster;
/**
* Creates the TP protocol, and initializes the state variables, does
* however not start any sockets or threads.
*/
protected TP() {
}
/** Whether or not hardware multicasting is supported */
public abstract boolean supportsMulticasting();
public boolean isMulticastCapable() {return supportsMulticasting();}
public String toString() {
if(!isSingleton())
return local_addr != null? name + "(local address: " + local_addr + ')' : name;
else
return name + " (singleton=" + singleton_name + ")";
}
@ManagedAttribute(description="The address of the channel")
public String getLocalAddress() {return local_addr != null? local_addr.toString() : null;}
@ManagedAttribute(description="The physical address of the channel")
public String getLocalPhysicalAddress() {return local_physical_addr != null? local_physical_addr.toString() : null;}
public void resetStats() {
num_msgs_sent=num_msgs_received=num_single_msgs_received=num_batches_received=num_bytes_sent=num_bytes_received=0;
num_oob_msgs_received=num_incoming_msgs_received=num_internal_msgs_received=num_single_msgs_sent=num_single_msgs_sent_instead_of_batch=num_batches_sent=0;
avg_batch_size.clear();
}
public void registerProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
if(diag_handler != null)
diag_handler.registerProbeHandler(handler);
else {
synchronized(preregistered_probe_handlers) {
preregistered_probe_handlers.add(handler);
}
}
}
public void unregisterProbeHandler(DiagnosticsHandler.ProbeHandler handler) {
if(diag_handler != null)
diag_handler.unregisterProbeHandler(handler);
}
/**
* Sets a {@link DiagnosticsHandler}. Should be set before the stack is started
* @param handler
*/
public void setDiagnosticsHandler(DiagnosticsHandler handler) {
if(handler != null) {
if(diag_handler != null)
diag_handler.stop();
diag_handler=handler;
}
}
public Bundler getBundler() {return bundler;}
/** Installs a bundler. Needs to be done before the channel is connected */
public void setBundler(Bundler bundler) {
this.bundler=bundler;
}
public void setThreadPoolQueueEnabled(boolean flag) {thread_pool_queue_enabled=flag;}
public Executor getDefaultThreadPool() {
return thread_pool;
}
public void setDefaultThreadPool(Executor thread_pool) {
if(this.thread_pool != null)
shutdownThreadPool(this.thread_pool);
this.thread_pool=thread_pool;
}
public ThreadFactory getDefaultThreadPoolThreadFactory() {
return default_thread_factory;
}
public void setDefaultThreadPoolThreadFactory(ThreadFactory factory) {
default_thread_factory=factory;
if(thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)thread_pool).setThreadFactory(factory);
}
public Executor getOOBThreadPool() {
return oob_thread_pool;
}
public void setOOBThreadPool(Executor oob_thread_pool) {
if(this.oob_thread_pool != null) {
shutdownThreadPool(this.oob_thread_pool);
}
this.oob_thread_pool=oob_thread_pool;
}
public ThreadFactory getOOBThreadPoolThreadFactory() {
return oob_thread_factory;
}
public void setOOBThreadPoolThreadFactory(ThreadFactory factory) {
oob_thread_factory=factory;
if(oob_thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)oob_thread_pool).setThreadFactory(factory);
}
public Executor getInternalThreadPool() {
return internal_thread_pool;
}
public void setInternalThreadPool(Executor internal_thread_pool) {
if(this.internal_thread_pool != null)
shutdownThreadPool(this.internal_thread_pool);
this.internal_thread_pool=internal_thread_pool;
}
public ThreadFactory getInternalThreadPoolThreadFactory() {
return internal_thread_factory;
}
public void setInternalThreadPoolThreadFactory(ThreadFactory factory) {
internal_thread_factory=factory;
if(internal_thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)internal_thread_pool).setThreadFactory(factory);
}
public ThreadFactory getTimerThreadFactory() {
return timer_thread_factory;
}
public void setTimerThreadFactory(ThreadFactory factory) {
timer_thread_factory=factory;
if(timer != null)
timer.setThreadFactory(factory);
}
public TimeScheduler getTimer() {return timer;}
/**
* Sets a new timer. This should be done before the transport is initialized; be very careful, as replacing a
* running timer with tasks in it can wreak havoc !
* @param timer
*/
public void setTimer(TimeScheduler timer) {
this.timer=timer;
}
public TimeService getTimeService() {return time_service;}
public void setTimeService(TimeService ts) {
if(ts == null)
return;
if(time_service != null)
time_service.stop();
time_service=ts;
time_service.start();
}
public ThreadFactory getThreadFactory() {
return global_thread_factory;
}
public void setThreadFactory(ThreadFactory factory) {
global_thread_factory=factory;
}
public SocketFactory getSocketFactory() {
return socket_factory;
}
public void setSocketFactory(SocketFactory factory) {
if(factory != null)
socket_factory=factory;
}
/**
* Names the current thread. Valid values are "pcl":
* p: include the previous (original) name, e.g. "Incoming thread-1", "UDP ucast receiver"
* c: include the cluster name, e.g. "MyCluster"
* l: include the local address of the current member, e.g. "192.168.5.1:5678"
*/
public String getThreadNamingPattern() {return thread_naming_pattern;}
public long getNumMessagesSent() {return num_msgs_sent;}
public long getNumMessagesReceived() {return num_msgs_received;}
public long getNumBytesSent() {return num_bytes_sent;}
public long getNumBytesReceived() {return num_bytes_received;}
public InetAddress getBindAddress() {return bind_addr;}
public void setBindAddress(InetAddress bind_addr) {this.bind_addr=bind_addr;}
public int getBindPort() {return bind_port;}
public void setBindPort(int port) {this.bind_port=port;}
public void setBindToAllInterfaces(boolean flag) {this.receive_on_all_interfaces=flag;}
public boolean isReceiveOnAllInterfaces() {return receive_on_all_interfaces;}
public List<NetworkInterface> getReceiveInterfaces() {return receive_interfaces;}
@Deprecated public static boolean isDiscardIncompatiblePackets() {return true;}
@Deprecated public static void setDiscardIncompatiblePackets(boolean flag) {}
@Deprecated public static boolean isEnableBundling() {return true;}
@Deprecated public void setEnableBundling(boolean flag) {}
@Deprecated public static boolean isEnableUnicastBundling() {return true;}
@Deprecated public void setEnableUnicastBundling(boolean enable_unicast_bundling) {}
public void setPortRange(int range) {this.port_range=range;}
public int getPortRange() {return port_range ;}
public boolean isOOBThreadPoolEnabled() { return oob_thread_pool_enabled; }
public boolean isDefaulThreadPoolEnabled() { return thread_pool_enabled; }
@Deprecated public boolean isLoopback() {return true;}
@Deprecated public void setLoopback(boolean b) {}
public ConcurrentMap<AsciiString,Protocol> getUpProtocols() {return up_prots;}
@ManagedAttribute(description="Current number of threads in the OOB thread pool")
public int getOOBPoolSize() {
return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getPoolSize() : 0;
}
@ManagedAttribute(description="Current number of active threads in the OOB thread pool")
public int getOOBPoolSizeActive() {
return oob_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)oob_thread_pool).getActiveCount() : 0;
}
public long getOOBMessages() {
return num_oob_msgs_received;
}
@ManagedAttribute(description="Number of messages in the OOB thread pool's queue")
public int getOOBQueueSize() {
return oob_thread_pool_queue != null? oob_thread_pool_queue.size() : 0;
}
public int getOOBMaxQueueSize() {
return oob_thread_pool_queue_max_size;
}
public void setOOBRejectionPolicy(String rejection_policy) {
RejectedExecutionHandler handler=Util.parseRejectionPolicy(rejection_policy);
if(oob_thread_pool instanceof ThreadPoolExecutor)
((ThreadPoolExecutor)oob_thread_pool).setRejectedExecutionHandler(new ShutdownRejectedExecutionHandler(handler));
}
@ManagedAttribute(description="Current number of threads in the default thread pool")
public int getRegularPoolSize() {
return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getPoolSize() : 0;
}
@ManagedAttribute(description="Current number of active threads in the default thread pool")
public int getRegularPoolSizeActive() {
return thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)thread_pool).getActiveCount() : 0;
}
public long getRegularMessages() {
return num_incoming_msgs_received;
}
@ManagedAttribute(description="Number of messages in the default thread pool's queue")
public int getRegularQueueSize() {
return thread_pool_queue != null? thread_pool_queue.size() : 0;
}
public int getRegularMaxQueueSize() {
return thread_pool_queue_max_size;
}
@ManagedAttribute(description="Current number of threads in the internal thread pool")
public int getInternalPoolSize() {
return internal_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)internal_thread_pool).getPoolSize() : 0;
}
@ManagedAttribute(description="Current number of active threads in the internal thread pool")
public int getInternalPoolSizeActive() {
return internal_thread_pool instanceof ThreadPoolExecutor? ((ThreadPoolExecutor)internal_thread_pool).getActiveCount() : 0;
}
public long getInternalMessages() {
return num_internal_msgs_received;
}
@ManagedAttribute(description="Number of messages in the internal thread pool's queue")
public int getInternalQueueSize() {
return internal_thread_pool_queue != null? internal_thread_pool_queue.size() : 0;
}
public int getInternalMaxQueueSize() {