-
Notifications
You must be signed in to change notification settings - Fork 476
/
Util.java
4730 lines (4133 loc) · 174 KB
/
Util.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package org.jgroups.util;
import org.jgroups.*;
import org.jgroups.annotations.ManagedAttribute;
import org.jgroups.annotations.Property;
import org.jgroups.blocks.cs.Connection;
import org.jgroups.conf.ClassConfigurator;
import org.jgroups.jmx.JmxConfigurator;
import org.jgroups.logging.Log;
import org.jgroups.protocols.*;
import org.jgroups.protocols.pbcast.GMS;
import org.jgroups.protocols.pbcast.NAKACK2;
import org.jgroups.protocols.pbcast.STABLE;
import org.jgroups.protocols.relay.SiteMaster;
import org.jgroups.protocols.relay.SiteUUID;
import org.jgroups.stack.IpAddress;
import org.jgroups.stack.Protocol;
import org.jgroups.stack.ProtocolStack;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
import org.w3c.dom.NodeList;
import javax.management.MBeanServer;
import javax.management.MBeanServerFactory;
import java.io.*;
import java.lang.annotation.Annotation;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.invoke.MethodType;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.*;
import java.math.BigInteger;
import java.net.*;
import java.nio.ByteBuffer;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.*;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import static java.lang.System.nanoTime;
import static org.jgroups.protocols.TP.LIST;
import static org.jgroups.protocols.TP.MULTICAST;
/**
* Collection of various utility routines that can not be assigned to other classes.
* @author Bela Ban
*/
public class Util {
private static final Map<Class<?>,Byte> TYPES=new IdentityHashMap<>(30);
private static final IntHashMap<Class<?>> CLASS_TYPES=new IntHashMap<>(20);
private static final byte TYPE_NULL = 0;
private static final byte TYPE_BOOLEAN = 1; // boolean
private static final byte TYPE_BOOLEAN_OBJ = 2; // Boolean
private static final byte TYPE_BYTE = 3; // byte
private static final byte TYPE_BYTE_OBJ = 4; // Byte
private static final byte TYPE_CHAR = 5; // char
private static final byte TYPE_CHAR_OBJ = 6; // Character
private static final byte TYPE_DOUBLE = 7; // double
private static final byte TYPE_DOUBLE_OBJ = 8; // Double
private static final byte TYPE_FLOAT = 9; // float
private static final byte TYPE_FLOAT_OBJ = 10; // Float
private static final byte TYPE_INT = 11; // int
private static final byte TYPE_INT_OBJ = 12; // Integer
private static final byte TYPE_LONG = 13; // long
private static final byte TYPE_LONG_OBJ = 14; // Long
private static final byte TYPE_SHORT = 15; // short
private static final byte TYPE_SHORT_OBJ = 16; // Short
private static final byte TYPE_STRING = 17; // ascii
private static final byte TYPE_BYTEARRAY = 18;
private static final byte TYPE_CLASS = 19; // a class
private static final byte TYPE_STREAMABLE = 50;
private static final byte TYPE_SERIALIZABLE = 51;
public static final int MAX_PORT=65535; // highest port allocatable
private static final Pattern METHOD_NAME_TO_ATTR_NAME_PATTERN=Pattern.compile("[A-Z]+");
private static final Pattern ATTR_NAME_TO_METHOD_NAME_PATTERN=Pattern.compile("_.");
protected static int CCHM_INITIAL_CAPACITY=16;
protected static float CCHM_LOAD_FACTOR=0.75f;
protected static int CCHM_CONCURRENCY_LEVEL=16;
public static final int DEFAULT_HEADERS;
/**
* The max size of an address list, used in View or Digest when toString() is called. Limiting this
* reduces the amount of log data
*/
public static int MAX_LIST_PRINT_SIZE=20;
private static final byte[] TYPE_NULL_ARRAY={0};
private static final byte[] TYPE_BOOLEAN_TRUE={TYPE_BOOLEAN, 1};
private static final byte[] TYPE_BOOLEAN_FALSE={TYPE_BOOLEAN, 0};
public static final Class<? extends Protocol>[] getUnicastProtocols() {return new Class[]{UNICAST3.class};}
public enum AddressScope {GLOBAL,SITE_LOCAL,LINK_LOCAL,LOOPBACK,NON_LOOPBACK}
private static boolean ipv4_stack_available=false, ipv6_stack_available=false;
private static final StackType ip_stack_type=_getIpStackType();
public static final boolean can_bind_to_mcast_addr;
protected static ResourceBundle resource_bundle;
// Fibers (project Loom - Java 15)
private static final MethodHandles.Lookup LOOKUP=MethodHandles.publicLookup();
private static final int VIRTUAL=getVirtual();
private static final MethodHandle THREAD_NEW_THREAD=getThreadNewThread();
private static final MethodHandle EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY=getNewVirtualThreadFactory();
static {
String tmp;
resource_bundle=ResourceBundle.getBundle("jg-messages",Locale.getDefault(),Util.class.getClassLoader());
add(TYPE_NULL, Void.class);
add(TYPE_BOOLEAN, boolean.class);
add(TYPE_BOOLEAN_OBJ, Boolean.class);
add(TYPE_BYTE, byte.class);
add(TYPE_BYTE_OBJ, Byte.class);
add(TYPE_CHAR, char.class);
add(TYPE_CHAR_OBJ, Character.class);
add(TYPE_DOUBLE, double.class);
add(TYPE_DOUBLE_OBJ, Double.class);
add(TYPE_FLOAT, float.class);
add(TYPE_FLOAT_OBJ, Float.class);
add(TYPE_INT, int.class);
add(TYPE_INT_OBJ, Integer.class);
add(TYPE_LONG, long.class);
add(TYPE_LONG_OBJ, Long.class);
add(TYPE_SHORT, short.class);
add(TYPE_SHORT_OBJ, Short.class);
add(TYPE_STRING, String.class);
add(TYPE_BYTEARRAY, byte[].class);
add(TYPE_CLASS, Class.class);
can_bind_to_mcast_addr=(Util.checkForLinux() && !Util.checkForAndroid())
|| Util.checkForSolaris() || Util.checkForHp() || Util.checkForMac();
try {
String cchm_initial_capacity=SecurityActions.getProperty(Global.CCHM_INITIAL_CAPACITY);
if(cchm_initial_capacity != null)
CCHM_INITIAL_CAPACITY=Integer.parseInt(cchm_initial_capacity);
}
catch(SecurityException ignored) {
}
try {
String cchm_load_factor=SecurityActions.getProperty(Global.CCHM_LOAD_FACTOR);
if(cchm_load_factor != null)
CCHM_LOAD_FACTOR=Float.parseFloat(cchm_load_factor);
}
catch(SecurityException ignored) {
}
try {
String cchm_concurrency_level=SecurityActions.getProperty(Global.CCHM_CONCURRENCY_LEVEL);
if(cchm_concurrency_level != null)
CCHM_CONCURRENCY_LEVEL=Integer.parseInt(cchm_concurrency_level);
}
catch(SecurityException ignored) {
}
try {
tmp=SecurityActions.getProperty(Global.MAX_LIST_PRINT_SIZE);
if(tmp != null)
MAX_LIST_PRINT_SIZE=Integer.parseInt(tmp);
}
catch(SecurityException ignored) {
}
try {
tmp=SecurityActions.getProperty(Global.DEFAULT_HEADERS);
DEFAULT_HEADERS=tmp != null? Integer.parseInt(tmp) : 4;
}
catch(Throwable t) {
throw new IllegalArgumentException(String.format("property %s has an incorrect value", Global.DEFAULT_HEADERS), t);
}
}
public static boolean fibersAvailable() {
return THREAD_NEW_THREAD != null;
}
protected static int getVirtual() {
MethodHandle tmp_handle=null;
try {
tmp_handle=LOOKUP.findStaticGetter(Thread.class, "VIRTUAL", int.class);
return (int)tmp_handle.invokeExact();
}
catch(Throwable t) {
return 1;
}
}
protected static MethodHandle getThreadNewThread() {
MethodType type=MethodType.methodType(Thread.class, String.class, int.class, Runnable.class);
try {
return LOOKUP.findStatic(Thread.class, "newThread", type);
}
catch(Exception e) {
return null;
}
}
protected static MethodHandle getNewVirtualThreadFactory() {
MethodType type=MethodType.methodType(ExecutorService.class);
try {
return LOOKUP.findStatic(Executors.class, "newVirtualThreadExecutor", type);
}
catch(Exception e) {
return null;
}
}
public static ExecutorService createFiberThreadPool() {
if(EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY == null)
throw new IllegalStateException("failed to create fiber thread pool (factory is null)");
try {
return (ExecutorService)EXECUTORS_NEW_VIRTUAL_THREAD_FACTORY.invokeExact();
}
catch(Throwable t) {
throw new IllegalStateException(String.format("failed to create fiber thread pool: %s", t));
}
}
/**
* Use of reflection to create fibers. If a JDK < 15/Loom is found, we'll create regular threads.
*/
public static Thread createFiber(Runnable r, String name) {
if(THREAD_NEW_THREAD == null) {
return new Thread(r, name);
}
try {
return (Thread)THREAD_NEW_THREAD.invokeExact(name, VIRTUAL, r);
}
catch(Throwable ex) {
return new Thread(r, name);
}
}
public static void assertTrue(boolean condition) {
assert condition;
}
public static void assertTrue(String message,boolean condition) {
if(message != null)
assert condition : message;
else
assert condition;
}
public static void assertFalse(boolean condition) {
assertFalse(null,condition);
}
public static void assertFalse(String message,boolean condition) {
if(message != null)
assert !condition : message;
else
assert !condition;
}
public static void assertEquals(String message,Object val1,Object val2) {
if(message != null) {
assert val1.equals(val2) : message;
}
else {
assert val1.equals(val2);
}
}
public static void assertNotNull(String message,Object val) {
if(message != null)
assert val != null : message;
else
assert val != null;
}
public static void assertNotNull(Object val) {
assertNotNull(null, val);
}
public static void assertNull(String message,Object val) {
if(message != null)
assert val == null : message;
else
assert val == null;
}
public static int getNextHigherPowerOfTwo(int num) {
if(num <= 0) return 1;
int highestBit=Integer.highestOneBit(num);
return num <= highestBit? highestBit : highestBit << 1;
}
public static String bold(String msg) {
StringBuilder sb=new StringBuilder("\033[1m");
sb.append(msg).append("\033[0m");
return sb.toString();
}
public static String getMessage(String key) {
return key != null? resource_bundle.getString(key) : null;
}
/**
* Returns a default stack for testing with transport = SHARED_LOOPBACK
* @param additional_protocols Any number of protocols to add to the top of the returned protocol list
* @return
*/
public static Protocol[] getTestStack(Protocol... additional_protocols) {
Protocol[] protocols={
new SHARED_LOOPBACK(),
new SHARED_LOOPBACK_PING(),
new NAKACK2(),
new UNICAST3(),
new STABLE(),
new GMS().setJoinTimeout(1000),
new FRAG2().setFragSize(8000)
};
if(additional_protocols == null)
return protocols;
Protocol[] tmp=Arrays.copyOf(protocols,protocols.length + additional_protocols.length);
System.arraycopy(additional_protocols, 0, tmp, protocols.length, additional_protocols.length);
return tmp;
}
/**
* Blocks until all channels have the same view
* @param timeout How long to wait (max in ms)
* @param interval Check every interval ms
* @param channels The channels which should form the view. The expected view size is channels.length.
* Must be non-null
*/
public static void waitUntilAllChannelsHaveSameView(long timeout, long interval, JChannel... channels) throws TimeoutException {
if(interval >= timeout || timeout <= 0)
throw new IllegalArgumentException("interval needs to be smaller than timeout or timeout needs to be > 0");
if(channels == null || channels.length == 0)
return;
long target_time=System.currentTimeMillis() + timeout;
while(System.currentTimeMillis() <= target_time) {
boolean all_channels_have_correct_view=true;
View first=channels[0].getView();
for(JChannel ch : channels) {
View view=ch.getView();
if(first == null || !Objects.equals(view, first) || view.size() != channels.length) {
all_channels_have_correct_view=false;
break;
}
}
if(all_channels_have_correct_view)
return;
Util.sleep(interval);
}
View[] views=new View[channels.length];
StringBuilder sb=new StringBuilder();
for(int i=0; i < channels.length; i++) {
views[i]=channels[i].getView();
sb.append(channels[i].getName()).append(": ").append(views[i]).append("\n");
}
View first=channels[0].getView();
for(View view : views)
if(view == null || !Objects.equals(view, first) || view.size() != channels.length)
throw new TimeoutException("Timeout " + timeout + " kicked in, views are:\n" + sb);
}
public static void waitUntilAllChannelsHaveSameView(long timeout, long interval,
Collection<JChannel> channels) throws TimeoutException {
JChannel[] tmp=new JChannel[channels != null? channels.size() : 0];
if(tmp == null)
return;
int index=0;
for(Iterator<JChannel> it=channels.iterator(); it.hasNext();)
tmp[index++]=it.next();
waitUntilAllChannelsHaveSameView(timeout, interval, tmp);
}
public static void waitUntil(long timeout, long interval, BooleanSupplier condition) throws TimeoutException {
waitUntil(timeout, interval, condition, null);
}
public static void waitUntil(long timeout, long interval, BooleanSupplier condition, Supplier<String> msg) throws TimeoutException {
long target_time=System.currentTimeMillis() + timeout;
while(System.currentTimeMillis() <= target_time) {
if(condition.getAsBoolean())
return;
Util.sleep(interval);
}
String error_msg=String.format("Timeout %d kicked in%s", timeout, msg != null? ": " + msg.get() : "");
throw new TimeoutException(error_msg);
}
public static boolean allChannelsHaveSameView(JChannel... channels) {
View first=channels[0].getView();
for(JChannel ch : channels) {
View view=ch.getView();
if(!Objects.equals(view, first) || view.size() != channels.length)
return false;
}
return true;
}
public static void assertAllChannelsHaveSameView(JChannel ... channels) {
assert allChannelsHaveSameView(channels) : String.format("channels have different views:\n%s\n", printViews(channels));
}
public static String printViews(JChannel ... channels) {
StringBuilder sb=new StringBuilder();
for(JChannel ch: channels)
sb.append(String.format("%s: %s\n", ch.name(), ch.getView()));
return sb.toString();
}
/**
* Waits until a list has the expected number of elements. Throws an exception if not met
* @param list The list
* @param expected_size The expected size
* @param timeout The time to wait (in ms)
* @param interval The interval at which to get the size of the list (in ms)
* @param <T> The type of the list
*/
public static <T> void waitUntilListHasSize(List<T> list,int expected_size,long timeout,long interval) {
if(list == null)
throw new IllegalStateException("list is null");
long target_time=System.currentTimeMillis() + timeout;
while(System.currentTimeMillis() < target_time) {
if(list.size() == expected_size)
break;
Util.sleep(interval);
}
assert list.size() == expected_size : "list doesn't have the expected (" + expected_size + ") elements: " + list;
}
public static void removeFromViews(Address mbr, JChannel ... channels) {
if(mbr == null || channels == null)
return;
for(JChannel ch: channels) {
if(ch == null)
continue;
GMS gms=ch.getProtocolStack().findProtocol(GMS.class);
if(gms == null) continue;
View v=gms.view();
if(v != null && v.containsMember(mbr)) {
List<Address> mbrs=new ArrayList<>(v.getMembers());
mbrs.remove(mbr);
long id=v.getViewId().getId() + 1;
View next=View.create(mbrs.get(0), id, mbrs);
gms.installView(next);
}
}
}
public static byte[] createAuthenticationDigest(String passcode,long t1,double q1) throws IOException,
NoSuchAlgorithmException {
ByteArrayOutputStream baos=new ByteArrayOutputStream(512);
DataOutputStream out=new DataOutputStream(baos);
byte[] digest=createDigest(passcode,t1,q1);
out.writeLong(t1);
out.writeDouble(q1);
out.writeInt(digest.length);
out.write(digest);
out.flush();
return baos.toByteArray();
}
public static byte[] createDigest(String passcode,long t1,double q1)
throws IOException, NoSuchAlgorithmException {
MessageDigest md=MessageDigest.getInstance("SHA");
md.update(passcode.getBytes());
ByteBuffer bb=ByteBuffer.allocate(16); //8 bytes for long and double each
bb.putLong(t1);
bb.putDouble(q1);
md.update(bb);
return md.digest();
}
/**
* Utility method. If the dest address is IPv6, convert scoped link-local addrs into unscoped ones
* @param sock
* @param dest
* @param sock_conn_timeout
* @throws IOException
*/
public static void connect(Socket sock,SocketAddress dest,int sock_conn_timeout) throws IOException {
if(dest instanceof InetSocketAddress) {
InetAddress addr=((InetSocketAddress)dest).getAddress();
if(addr instanceof Inet6Address) {
Inet6Address tmp=(Inet6Address)addr;
if(tmp.getScopeId() != 0) {
dest=new InetSocketAddress(InetAddress.getByAddress(tmp.getAddress()),((InetSocketAddress)dest).getPort());
}
}
}
sock.connect(dest, sock_conn_timeout);
}
public static boolean connect(SocketChannel ch, SocketAddress dest) throws IOException {
if(dest instanceof InetSocketAddress) {
InetAddress addr=((InetSocketAddress)dest).getAddress();
if(addr instanceof Inet6Address) {
Inet6Address tmp=(Inet6Address)addr;
if(tmp.getScopeId() != 0) {
dest=new InetSocketAddress(InetAddress.getByAddress(tmp.getAddress()),((InetSocketAddress)dest).getPort());
}
}
}
return ch.connect(dest);
}
public static void close(Closeable closeable) {
if(closeable != null) {
try {
closeable.close();
}
catch(Throwable ignored) {
}
}
}
public static void close(Closeable... closeables) {
if(closeables != null) {
for(Closeable closeable : closeables)
Util.close(closeable);
}
}
public static void closeReverse(Closeable... closeables) {
if(closeables != null) {
for(int i=closeables.length-1; i >= 0; i--)
Util.close(closeables[i]);
}
}
/** Closes all non-coordinators first, in parallel, then closes the coord. This should produce just 2 views */
public static void closeFast(JChannel ... channels) {
if(channels != null) {
// close all non-coordinator channels first (in parallel)
Stream.of(channels).parallel().filter(ch -> !isCoordinator(ch)).forEach(JChannel::close);
try {
Util.waitUntil(5000, 100,
() -> Stream.of(channels).filter(ch -> !isCoordinator(ch)).allMatch(JChannel::isClosed));
}
catch(TimeoutException e) {
}
// then close the cordinator's channel
Stream.of(channels).filter(Util::isCoordinator).forEach(JChannel::close);
}
}
/**
* Drops messages to/from other members and then closes the channel. Note that this member won't get excluded from
* the view until failure detection has kicked in and the new coord installed the new view
*/
public static void shutdown(JChannel ch) throws Exception {
DISCARD discard=new DISCARD();
discard.setLocalAddress(ch.getAddress());
discard.discardAll(true);
ProtocolStack stack=ch.getProtocolStack();
TP transport=stack.getTransport();
stack.insertProtocol(discard,ProtocolStack.Position.ABOVE,transport.getClass());
//abruptly shutdown FD_SOCK just as in real life when member gets killed non gracefully
FD_SOCK fd=ch.getProtocolStack().findProtocol(FD_SOCK.class);
if(fd != null)
fd.stopServerSocket(false);
View view=ch.getView();
if(view != null) {
ViewId vid=view.getViewId();
List<Address> members=Collections.singletonList(ch.getAddress());
ViewId new_vid=new ViewId(ch.getAddress(),vid.getId() + 1);
View new_view=new View(new_vid,members);
// inject view in which the shut down member is the only element
GMS gms=stack.findProtocol(GMS.class);
gms.installView(new_view);
}
Util.close(ch);
}
public static byte setFlag(byte bits,byte flag) {
return bits|=flag;
}
public static boolean isFlagSet(byte bits,byte flag) {
return (bits & flag) == flag;
}
public static byte clearFlags(byte bits,byte flag) {
return bits&=~flag;
}
public static String flagsToString(short flags) {
StringBuilder sb=new StringBuilder();
boolean first=true;
Message.Flag[] all_flags=Message.Flag.values();
for(Message.Flag flag: all_flags) {
if(isFlagSet(flags, flag)) {
if(first)
first=false;
else
sb.append("|");
sb.append(flag);
}
}
return sb.toString();
}
public static String transientFlagsToString(short flags) {
StringBuilder sb=new StringBuilder();
boolean first=true;
Message.TransientFlag[] all_flags=Message.TransientFlag.values();
for(Message.TransientFlag flag: all_flags) {
if(isTransientFlagSet(flags, flag)) {
if(first)
first=false;
else
sb.append("|");
sb.append(flag);
}
}
return sb.toString();
}
public static boolean isFlagSet(short flags, Message.Flag flag) {
return flag != null && ((flags & flag.value()) == flag.value());
}
public static boolean isTransientFlagSet(short flags, Message.TransientFlag flag) {
return flag != null && (flags & flag.value()) == flag.value();
}
/**
* Copies a message. Copies only headers with IDs >= starting_id or IDs which are in the copy_only_ids list
* @param copy_buffer
* @param starting_id
* @param copy_only_ids
* @return
*/
public static Message copy(Message msg, boolean copy_buffer, short starting_id, short... copy_only_ids) {
Message retval=msg.copy(copy_buffer, false);
for(Map.Entry<Short,Header> entry: msg.getHeaders().entrySet()) {
short id=entry.getKey();
if(id >= starting_id || Util.containsId(id, copy_only_ids))
retval.putHeader(id, entry.getValue());
}
return retval;
}
/**
* Creates an object from a byte buffer
*/
public static <T extends Object> T objectFromByteBuffer(byte[] buffer) throws IOException, ClassNotFoundException {
if(buffer == null) return null;
return objectFromByteBuffer(buffer,0,buffer.length);
}
public static <T extends Object> T objectFromByteBuffer(byte[] buffer,int offset,int length) throws IOException, ClassNotFoundException {
return objectFromByteBuffer(buffer, offset, length, null);
}
public static <T extends Object> T objectFromBuffer(ByteArray b, ClassLoader loader) throws IOException, ClassNotFoundException {
return objectFromByteBuffer(b.getArray(), b.getOffset(), b.getLength(), loader);
}
public static <T extends Object> T objectFromByteBuffer(byte[] buffer,int offset,int length, ClassLoader loader) throws IOException, ClassNotFoundException {
if(buffer == null) return null;
byte type=buffer[offset++];
length--;
switch(type) {
case TYPE_NULL: return null;
case TYPE_STREAMABLE:
DataInput in=new ByteArrayDataInputStream(buffer,offset,length);
return readGenericStreamable(in, loader);
case TYPE_SERIALIZABLE: // the object is Externalizable or Serializable
InputStream in_stream=new ByteArrayInputStream(buffer,offset,length);
try(ObjectInputStream oin=new ObjectInputStreamWithClassloader(in_stream, loader)) {
return (T)oin.readObject();
}
case TYPE_BOOLEAN: case TYPE_BOOLEAN_OBJ: return (T)(Boolean)(buffer[offset] == 1);
case TYPE_BYTE: case TYPE_BYTE_OBJ: return (T)(Byte)buffer[offset];
case TYPE_CHAR: case TYPE_CHAR_OBJ: return (T)(Character)Bits.readChar(buffer, offset);
case TYPE_DOUBLE: case TYPE_DOUBLE_OBJ: return (T)(Double)Bits.readDouble(buffer, offset);
case TYPE_FLOAT: case TYPE_FLOAT_OBJ: return (T)(Float)Bits.readFloat(buffer, offset);
case TYPE_INT: case TYPE_INT_OBJ: return (T)(Integer)Bits.readIntCompressed(buffer, offset);
case TYPE_LONG: case TYPE_LONG_OBJ: return (T)(Long)Bits.readLongCompressed(buffer, offset);
case TYPE_SHORT: case TYPE_SHORT_OBJ: return (T)(Short)Bits.readShort(buffer, offset);
case TYPE_STRING:
in=new ByteArrayDataInputStream(buffer, offset, length);
return (T)readString(in);
case TYPE_BYTEARRAY:
byte[] tmp=new byte[length];
System.arraycopy(buffer,offset,tmp,0,length);
return (T)tmp;
case TYPE_CLASS:
return (T)CLASS_TYPES.get(buffer[offset]);
default:
throw new IllegalArgumentException("type " + type + " is invalid");
}
}
/**
* Parses an object from a {@link ByteBuffer}. Note that this changes the position of the buffer, so it this is
* not desired, use {@link ByteBuffer#duplicate()} to create a copy and pass the copy to this method.
*/
public static <T extends Object> T objectFromByteBuffer(ByteBuffer buffer, ClassLoader loader) throws Exception {
if(buffer == null) return null;
byte type=buffer.get();
switch(type) {
case TYPE_NULL:
return null;
case TYPE_STREAMABLE:
DataInput in=new ByteBufferInputStream(buffer);
return readGenericStreamable(in, loader);
case TYPE_SERIALIZABLE: // the object is Externalizable or Serializable
InputStream in_stream=new ByteBufferInputStream(buffer);
try(ObjectInputStream oin=new ObjectInputStreamWithClassloader(in_stream, loader)) {
return (T)oin.readObject();
}
case TYPE_BOOLEAN: case TYPE_BOOLEAN_OBJ: return (T)(Boolean)(buffer.get() == 1);
case TYPE_BYTE: case TYPE_BYTE_OBJ: return (T)(Byte)buffer.get();
case TYPE_CHAR: case TYPE_CHAR_OBJ: return (T)(Character)Bits.readChar(buffer);
case TYPE_DOUBLE: case TYPE_DOUBLE_OBJ: return (T)(Double)Bits.readDouble(buffer);
case TYPE_FLOAT: case TYPE_FLOAT_OBJ: return (T)(Float)Bits.readFloat(buffer);
case TYPE_INT: case TYPE_INT_OBJ: return (T)(Integer)Bits.readIntCompressed(buffer);
case TYPE_LONG: case TYPE_LONG_OBJ: return (T)(Long)Bits.readLongCompressed(buffer);
case TYPE_SHORT: case TYPE_SHORT_OBJ: return (T)(Short)Bits.readShort(buffer);
case TYPE_BYTEARRAY:
byte[] tmp=new byte[buffer.remaining()];
buffer.get(tmp);
return (T)tmp;
case TYPE_STRING:
in=new ByteBufferInputStream(buffer);
return (T)readString(in);
case TYPE_CLASS:
return (T)CLASS_TYPES.get(buffer.get());
default:
throw new IllegalArgumentException("type " + type + " is invalid");
}
}
public static byte[] objectToByteBuffer(Object obj) throws IOException {
return objectToBuffer(obj).getBytes();
}
/**
* Serializes/Streams an object into a byte buffer.
* The object has to implement interface Serializable or Externalizable or Streamable.
*/
public static ByteArray objectToBuffer(Object obj) throws IOException {
if(obj == null)
return new ByteArray(TYPE_NULL_ARRAY);
if(obj instanceof Streamable)
return writeStreamable((Streamable)obj);
Byte type=obj instanceof Class<?>? TYPES.get(obj) : TYPES.get(obj.getClass());
return type == null? writeSerializable(obj) : new ByteArray(marshalPrimitiveType(type, obj));
}
protected static ByteArray writeStreamable(Streamable obj) throws IOException {
int expected_size=obj instanceof SizeStreamable? ((SizeStreamable)obj).serializedSize() : 512;
final ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(expected_size, true);
out.write(TYPE_STREAMABLE);
writeGenericStreamable(obj, out);
return out.getBuffer();
}
protected static ByteArray writeSerializable(Object obj) throws IOException {
final ByteArrayDataOutputStream out_stream=new ByteArrayDataOutputStream(512, true);
out_stream.write(TYPE_SERIALIZABLE);
try(ObjectOutputStream out=new ObjectOutputStream(new OutputStreamAdapter(out_stream))) {
out.writeObject(obj);
out.flush();
return out_stream.getBuffer();
}
}
protected static byte[] marshalPrimitiveType(Byte type, Object obj) {
if(obj instanceof Class<?>)
return new byte[]{TYPE_CLASS, type};
switch(type) {
case TYPE_BOOLEAN: case TYPE_BOOLEAN_OBJ:
return ((Boolean)obj)? TYPE_BOOLEAN_TRUE : TYPE_BOOLEAN_FALSE;
case TYPE_BYTE: case TYPE_BYTE_OBJ:
return new byte[]{type, (byte)obj};
case TYPE_CHAR: case TYPE_CHAR_OBJ:
byte[] buf=new byte[Global.BYTE_SIZE *3];
buf[0]=type;
Bits.writeChar((char)obj, buf, 1);
return buf;
case TYPE_DOUBLE: case TYPE_DOUBLE_OBJ:
buf=new byte[Global.BYTE_SIZE + Global.DOUBLE_SIZE];
buf[0]=type;
Bits.writeDouble((double)obj, buf, 1);
return buf;
case TYPE_FLOAT: case TYPE_FLOAT_OBJ:
buf=new byte[Global.BYTE_SIZE + Global.FLOAT_SIZE];
buf[0]=type;
Bits.writeFloat((float)obj, buf, 1);
return buf;
case TYPE_INT: case TYPE_INT_OBJ:
int i=(int)obj;
buf=new byte[Global.BYTE_SIZE + Bits.size(i)];
buf[0]=type;
Bits.writeIntCompressed(i, buf, 1);
return buf;
case TYPE_LONG: case TYPE_LONG_OBJ:
long l=(long)obj;
buf=new byte[Global.BYTE_SIZE + Bits.size(l)];
buf[0]=type;
Bits.writeLongCompressed((long)obj, buf, 1);
return buf;
case TYPE_SHORT: case TYPE_SHORT_OBJ:
buf=new byte[Global.BYTE_SIZE + Global.SHORT_SIZE];
buf[0]=type;
Bits.writeShort((short)obj, buf, 1);
return buf;
case TYPE_STRING:
String str=(String)obj;
ByteArrayDataOutputStream out=new ByteArrayDataOutputStream(str.length()*2 +3);
try {
out.writeByte(TYPE_STRING);
writeString(str, out);
byte[] ret=new byte[out.position()];
System.arraycopy(out.buffer(), 0, ret, 0, ret.length);
return ret;
}
catch(IOException ex) {
throw new RuntimeException(ex);
}
case TYPE_BYTEARRAY:
buf=(byte[])obj;
byte[] buffer=new byte[Global.BYTE_SIZE + buf.length];
buffer[0]=TYPE_BYTEARRAY;
System.arraycopy(buf, 0, buffer, 1, buf.length);
return buffer;
default:
throw new IllegalArgumentException("type " + type + " is invalid");
}
}
public static void writeString(String str, DataOutput out) throws IOException {
boolean is_ascii=isAsciiString(str);
out.writeBoolean(is_ascii);
if(is_ascii) {
int len=str.length();
out.writeInt(len);
for(int i=0; i < len; i++)
out.write((byte)str.charAt(i));
}
else
out.writeUTF(str);
}
public static String readString(DataInput in) throws IOException {
if(in.readBoolean() == false)
return in.readUTF();
byte[] tmp=new byte[in.readInt()];
in.readFully(tmp);
return new String(tmp);
}
public static int size(Object obj) {
if(obj == null)
return Global.BYTE_SIZE;
if(obj instanceof SizeStreamable)
return Global.BYTE_SIZE + Util.size((SizeStreamable)obj);
Byte type=obj instanceof Class<?>? TYPES.get(obj) : TYPES.get(obj.getClass());
if(obj instanceof Class<?>) {
if(type != null)
return Global.BYTE_SIZE *2;
}
int retval=Global.BYTE_SIZE;
switch(type) {
case TYPE_BOOLEAN: case TYPE_BOOLEAN_OBJ:
case TYPE_BYTE: case TYPE_BYTE_OBJ:
return retval+Global.BYTE_SIZE;
case TYPE_SHORT: case TYPE_SHORT_OBJ:
case TYPE_CHAR: case TYPE_CHAR_OBJ:
return retval+Character.BYTES;
case TYPE_LONG: case TYPE_LONG_OBJ:
case TYPE_DOUBLE: case TYPE_DOUBLE_OBJ:
return retval+Double.BYTES;
case TYPE_INT: case TYPE_INT_OBJ:
case TYPE_FLOAT: case TYPE_FLOAT_OBJ:
return retval+Float.BYTES;
case TYPE_STRING:
String s=(String)obj;
retval+=Global.BYTE_SIZE; // is_ascii
if(isAsciiString(s))
return retval+Global.INT_SIZE + s.length();
else
return retval+Bits.sizeUTF(s);
case TYPE_BYTEARRAY:
byte[] buf=(byte[])obj;
return retval+Global.INT_SIZE + buf.length;
default:
throw new IllegalArgumentException("type " + type + " is invalid");
}
}
public static void objectToStream(Object obj, DataOutput out) throws IOException {
if(obj == null) {
out.write(TYPE_NULL);
return;
}
if(obj instanceof Streamable) {
out.writeByte(TYPE_STREAMABLE);
writeGenericStreamable((Streamable)obj,out);
return;
}
Byte type=obj instanceof Class<?>? TYPES.get(obj) : TYPES.get(obj.getClass());
if(type == null) {
out.write(TYPE_SERIALIZABLE);
try(ObjectOutputStream tmp=new ObjectOutputStream(out instanceof ByteArrayDataOutputStream?
new OutputStreamAdapter((ByteArrayDataOutputStream)out) :
(OutputStream)out)) {
tmp.writeObject(obj);
return;
}
}
if(obj instanceof Class<?>) {
if(type != null) {
out.writeByte(TYPE_CLASS);
out.writeByte(type);
return;
}
}
out.writeByte(type);
switch(type) {
case TYPE_BOOLEAN: case TYPE_BOOLEAN_OBJ:
out.writeBoolean((Boolean)obj);
break;
case TYPE_BYTE: case TYPE_BYTE_OBJ:
out.writeByte((Byte)obj);
break;
case TYPE_CHAR: case TYPE_CHAR_OBJ:
out.writeChar((Character)obj);
break;
case TYPE_DOUBLE: case TYPE_DOUBLE_OBJ:
out.writeDouble((Double)obj);
break;
case TYPE_FLOAT: case TYPE_FLOAT_OBJ:
out.writeFloat((Float)obj);
break;
case TYPE_INT: case TYPE_INT_OBJ:
out.writeInt((Integer)obj);
break;
case TYPE_LONG: case TYPE_LONG_OBJ:
out.writeLong((Long)obj);
break;
case TYPE_SHORT: case TYPE_SHORT_OBJ:
out.writeShort((Short)obj);
break;
case TYPE_STRING:
writeString((String)obj, out);
break;
case TYPE_BYTEARRAY:
byte[] buf=(byte[])obj;
out.writeInt(buf.length);
out.write(buf, 0, buf.length);
break;
default:
throw new IllegalArgumentException("type " + type + " is invalid");
}