/
ENCRYPT.java
1263 lines (1090 loc) · 43 KB
/
ENCRYPT.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package org.jgroups.protocols;
import org.jgroups.*;
import org.jgroups.annotations.GuardedBy;
import org.jgroups.annotations.MBean;
import org.jgroups.annotations.Property;
import org.jgroups.stack.Protocol;
import org.jgroups.util.MessageBatch;
import org.jgroups.util.QueueClosedException;
import org.jgroups.util.Util;
import javax.crypto.*;
import javax.crypto.spec.SecretKeySpec;
import java.io.*;
import java.security.*;
import java.security.cert.CertificateException;
import java.security.spec.X509EncodedKeySpec;
import java.util.List;
import java.util.Map;
import java.util.WeakHashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/**
* ENCRYPT layer. Encrypt and decrypt communication in JGroups
*
* This class can be used in two ways:
* <ul>
* <li> Option 1. Configured with a secretKey in a keystore so it can be used at
* any layer in JGroups without the need for a coordinator, or if you want
* protection against passive monitoring but do not want the key exchange
* overhead and complexity. In this mode all nodes must be distributed with the
* same keystore file.
* <li> Option 2. Configured with algorithms and key sizes. The ENCRYPT layer in
* this mode sould be placed above the GMS protocol in the configuration. The
* coordinator then chooses the secretkey which it distributes amongst all the
* peers. In this form, no keystore exists as the keys are distributed using a
* public/private key exchange. View changes that identify a new controller will
* result in a new session key being generated and then distributed to all
* peers. This overhead can be substantial in a an application with a reasonable
* peer churn.
* </ul>
* <p>
* <p>
* Each message is identified as encrypted with a specific encryption header
* which identifies the type of encrypt header and an MD5 digest that identifies
* the version of the key being used to encrypt/decrypt the messages.
* <p>
* <p>
* <h2>Option 1</h2>
* <br>
* This is the simplest option and can be used by simply inserting the
* Encryption layer at any point in the JGroups stack - it will encrypt all
* Events of a type MSG that have a non-null message buffer. The format of the
* entry in this form is:<br>
* <ENCRYPT key_store_name="defaultStore.keystore" store_password="changeit"
* alias="myKey"/><br>
* An example showing the keystore version can be found in
* the conf in a file called EncryptKeyStore.xml - along with a
* defaultStore.keystore file.<br>
* In order to use the ENCRYPT layer in this manner, it is necessary to have the
* secretKey already generated in a keystore file. The directory containing the
* keystore file must be on the application's classpath. You cannot create a
* SecretKey keystore file using the keytool application shipped with the JDK. A
* java file called KeyStoreGenerator is included in the demo package that can
* be used from the command line (or IDE) to generate a suitable keystore.
* <p>
* <p>
* <h2>Option 2</h2>
* <br>
* This option is suited to an application that does not ship with a known key
* but instead it is generated and distributed by the controller. The secret key
* is first generated by the controller (in JGroups terms). When a view change
* occurs, a peer will request the secret key by sending a key request with its
* own public key. The controller encrypts the secret key with this key and
* sends it back to the peer who then decrypts it and installs the key as its
* own secret key. <br>
* All encryption and decryption of messages is done using this key. When a peer
* receives a view change that shows a different keyserver, it will repeat this
* process - the view change event also trigger the ENCRYPT layer to queue up
* and down messages until the new key is installed. The previous keys are
* retained so that messages sent before the view change that are queued can be
* decrypted if the key is different. <br>
* An example EncryptNoKeyStore.xml is included in the conf file as a guide.
* <p>
* <p>
* <br>
* Note: the current version does not support the concept of perfect forward
* encryption (PFE) which means that if a peer leaves the group the keys are
* re-generated preventing the departed peer from decrypting future messages if
* it chooses to listen in on the group. This is not included as it really
* requires a suitable authentication scheme as well to make this feature useful
* as there is nothing to stop the peer rejoining and receiving the new key. A
* future release will address this issue.
*
* @author Steve Woodcock
* @author Bela Ban
*/
@MBean(description="Protocol which encrypts and decrypts cluster traffic")
public class ENCRYPT extends Protocol {
Observer observer;
interface Observer {
void up(Event evt);
void passUp(Event evt);
void down(Event evt);
void passDown(Event evt);
}
private static final String DEFAULT_SYM_ALGO="AES";
// address info
Address local_addr=null;
// keyserver address
Address keyServerAddr=null;
//used to see whether we are the key server
boolean keyServer=false;
/* ----------------------------------------- Properties -------------------------------------------------- */
// encryption properties in no supplied key mode
@Property(name="asym_provider", description="Cryptographic Service Provider. Default is Bouncy Castle Provider")
String asymProvider=null;
@Property(name="sym_provider", description="Cryptographic Service Provider. Default is Bouncy Castle Provider")
String symProvider=null;
@Property(name="asym_algorithm", description="Cipher engine transformation for asymmetric algorithm. Default is RSA")
String asymAlgorithm="RSA";
@Property(name="sym_algorithm", description="Cipher engine transformation for symmetric algorithm. Default is AES")
String symAlgorithm=DEFAULT_SYM_ALGO;
@Property(name="asym_init", description="Initial public/private key length. Default is 512")
int asymInit=512;
@Property(name="sym_init", description="Initial key length for matching symmetric algorithm. Default is 128")
int symInit=128;
// properties for functioning in supplied key mode
private boolean suppliedKey=false;
@Property(name="key_store_name", description="File on classpath that contains keystore repository")
String keyStoreName;
@Property(name="store_password", description="Password used to check the integrity/unlock the keystore. Change the default")
private String storePassword="changeit"; //JDK default
@Property(name="key_password", description="Password for recovering the key. Change the default")
private String keyPassword=null; // allows to assign keypwd=storepwd if not set (https://issues.jboss.org/browse/JGRP-1375)
@Property(name="alias", description="Alias used for recovering the key. Change the default")
private String alias="mykey"; // JDK default
// public/private Key
KeyPair Kpair; // to store own's public/private Key
// for client to store server's public Key
PublicKey serverPubKey=null;
// needed because we do simultaneous encode/decode with these ciphers - which
// would be a threading issue
Cipher symEncodingCipher;
@GuardedBy("decrypt_lock")
Cipher symDecodingCipher;
/** To synchronize access to symDecodingCipher */
protected final Lock decrypt_lock=new ReentrantLock();
// version filed for secret key
private String symVersion=null;
// dhared secret key to encrypt/decrypt messages
SecretKey secretKey=null;
// map to hold previous keys so we can decrypt some earlier messages if we need to
final Map<String,Cipher> keyMap=new WeakHashMap<String,Cipher>();
// queues to buffer data while we are swapping shared key
// or obtsining key for first time
private boolean queue_up=true;
private boolean queue_down=false;
// queue to hold upcoming messages while key negotiation is happening
private BlockingQueue<Message> upMessageQueue=new LinkedBlockingQueue<Message>();
// queue to hold downcoming messages while key negotiation is happening
private BlockingQueue<Message> downMessageQueue=new LinkedBlockingQueue<Message>();
// decrypting cypher for secret key requests
private Cipher asymCipher;
/** determines whether to encrypt the entire message, or just the buffer */
@Property
private boolean encrypt_entire_message=false;
public void setObserver(Observer o) {
observer=o;
}
/*
* GetAlgorithm: Get the algorithm name from "algorithm/mode/padding"
* taken m original ENCRYPT file
*/
private static String getAlgorithm(String s) {
int index=s.indexOf("/");
if(index == -1)
return s;
return s.substring(0, index);
}
public void init() throws Exception {
if(keyPassword == null && storePassword != null) {
keyPassword=storePassword;
if(log.isDebugEnabled())
log.debug("key_password used is same as store_password");
}
if(keyStoreName == null) {
initSymKey();
initKeyPair();
}
else {
initConfiguredKey();
}
initSymCiphers(symAlgorithm, getSecretKey());
}
/**
* Initialisation if a supplied key is defined in the properties. This
* supplied key must be in a keystore which can be generated using the
* keystoreGenerator file in demos. The keystore must be on the classpath to
* find it.
*
* @throws KeyStoreException
* @throws Exception
* @throws IOException
* @throws NoSuchAlgorithmException
* @throws CertificateException
* @throws UnrecoverableKeyException
*/
private void initConfiguredKey() throws Exception {
InputStream inputStream=null;
// must not use default keystore type - as does not support secret keys
KeyStore store=KeyStore.getInstance("JCEKS");
SecretKey tempKey=null;
try {
// load in keystore using this thread's classloader
inputStream=Thread.currentThread()
.getContextClassLoader()
.getResourceAsStream(keyStoreName);
if(inputStream == null)
inputStream=new FileInputStream(keyStoreName);
// we can't find a keystore here -
if(inputStream == null) {
throw new Exception("Unable to load keystore " + keyStoreName
+ " ensure file is on classpath");
}
// we have located a file lets load the keystore
try {
store.load(inputStream, storePassword.toCharArray());
// loaded keystore - get the key
tempKey=(SecretKey)store.getKey(alias, keyPassword.toCharArray());
}
catch(IOException e) {
throw new Exception("Unable to load keystore " + keyStoreName + ": " + e);
}
catch(NoSuchAlgorithmException e) {
throw new Exception("No Such algorithm " + keyStoreName + ": " + e);
}
catch(CertificateException e) {
throw new Exception("Certificate exception " + keyStoreName + ": " + e);
}
if(tempKey == null) {
throw new Exception("Unable to retrieve key '" + alias
+ "' from keystore "
+ keyStoreName);
}
//set the key here
setSecretKey(tempKey);
if(symAlgorithm.equals(DEFAULT_SYM_ALGO)) {
symAlgorithm=tempKey.getAlgorithm();
}
// set the fact we are using a supplied key
suppliedKey=true;
queue_down=false;
queue_up=false;
}
finally {
Util.close(inputStream);
}
}
/**
* Used to initialise the symmetric key if none is supplied in a keystore.
*
* @throws Exception
*/
public void initSymKey() throws Exception {
KeyGenerator keyGen=null;
// see if we have a provider specified
if(symProvider != null && !symProvider.trim().isEmpty()) {
keyGen=KeyGenerator.getInstance(getAlgorithm(symAlgorithm), symProvider);
}
else {
keyGen=KeyGenerator.getInstance(getAlgorithm(symAlgorithm));
}
// generate the key using the defined init properties
keyGen.init(symInit);
secretKey=keyGen.generateKey();
setSecretKey(secretKey);
if(log.isDebugEnabled())
log.debug(" Symmetric key generated ");
}
/**
* Initialises the Ciphers for both encryption and decryption using the
* generated or supplied secret key.
*
* @param algorithm
* @param secret
* @throws Exception
*/
private void initSymCiphers(String algorithm, SecretKey secret) throws Exception {
if(log.isDebugEnabled())
log.debug(" Initializing symmetric ciphers");
if(symProvider != null && !symProvider.trim().isEmpty()) {
symEncodingCipher=Cipher.getInstance(algorithm, symProvider);
symDecodingCipher=Cipher.getInstance(algorithm, symProvider);
}
else {
symEncodingCipher=Cipher.getInstance(algorithm);
symDecodingCipher=Cipher.getInstance(algorithm);
}
symEncodingCipher.init(Cipher.ENCRYPT_MODE, secret);
symDecodingCipher.init(Cipher.DECRYPT_MODE, secret);
//set the version
MessageDigest digest=MessageDigest.getInstance("MD5");
digest.reset();
digest.update(secret.getEncoded());
symVersion = byteArrayToHexString(digest.digest());
if(log.isDebugEnabled()) {
log.debug(" Initialized symmetric ciphers with secret key (" + symVersion.length() + " bytes)");
}
}
public static String byteArrayToHexString(byte[] b){
StringBuilder sb = new StringBuilder(b.length * 2);
for (int i = 0; i < b.length; i++){
int v = b[i] & 0xff;
if (v < 16) { sb.append('0'); }
sb.append(Integer.toHexString(v));
}
return sb.toString().toUpperCase();
}
/**
* Generates the public/private key pair from the init params
*
* @throws Exception
*/
public void initKeyPair() throws Exception {
// generate keys according to the specified algorithms
// generate publicKey and Private Key
KeyPairGenerator KpairGen=null;
if(asymProvider != null && !asymProvider.trim().isEmpty()) {
KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asymAlgorithm), asymProvider);
}
else {
KpairGen=KeyPairGenerator.getInstance(getAlgorithm(asymAlgorithm));
}
KpairGen.initialize(asymInit, new SecureRandom());
Kpair=KpairGen.generateKeyPair();
// set up the Cipher to decrypt secret key responses encrypted with our key
if(asymProvider != null && !asymProvider.trim().isEmpty())
asymCipher=Cipher.getInstance(asymAlgorithm, asymProvider);
else
asymCipher=Cipher.getInstance(asymAlgorithm);
asymCipher.init(Cipher.DECRYPT_MODE, Kpair.getPrivate());
if(log.isDebugEnabled())
log.debug(" asym algo initialized");
}
/** Just remove if you don't need to reset any state */
public void reset() {}
/* (non-Javadoc)
* @see org.jgroups.stack.Protocol#up(org.jgroups.Event)
*/
public Object up(Event evt) {
switch(evt.getType()) {
case Event.VIEW_CHANGE:
View view=(View)evt.getArg();
if(log.isDebugEnabled())
log.debug("new view: " + view);
if(!suppliedKey) {
handleViewChange(view, false);
}
break;
case Event.TMP_VIEW:
view=(View)evt.getArg();
if(!suppliedKey) {
// if a tmp_view then we are trying to become coordinator so
// make us keyserver
handleViewChange(view, true);
}
break;
// we try and decrypt all messages
case Event.MSG:
try {
handleUpMessage(evt);
}
catch(Exception e) {
log.warn("exception occurred decrypting message", e);
}
return null;
default:
break;
}
return passItUp(evt);
}
public void up(MessageBatch batch) {
for(Message msg: batch) {
if(msg.getLength() == 0 && !encrypt_entire_message)
continue;
EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id);
if(hdr == null) {
if(log.isTraceEnabled())
log.trace("dropping message as ENCRYPT header is null or has not been recognized, msg will not be passed up, " +
"headers are " + msg.printHeaders());
batch.remove(msg);
continue;
}
switch(hdr.getType()) {
case EncryptHeader.ENCRYPT:
// if msg buffer is empty, and we didn't encrypt the entire message, just pass up
if(!hdr.encrypt_entire_msg && msg.getLength() == 0)
break;
// if queueing then pass into queue to be dealt with later
if(queue_up) {
if(log.isTraceEnabled())
log.trace("queueing up message as no session key established: " + msg);
try {
upMessageQueue.put(msg);
}
catch(InterruptedException e) {
}
}
else {
// make sure we pass up any queued messages first
// could be more optimised but this can wait we only need this if not using supplied key
if(!suppliedKey) {
try {
drainUpQueue();
}
catch(Exception e) {
log.error("failed draining up queue", e);
}
}
// try and decrypt the message - we need to copy msg as we modify its
// buffer (http://jira.jboss.com/jira/browse/JGRP-538)
try {
Message tmpMsg=decryptMessage(symDecodingCipher, msg.copy());
if(tmpMsg != null)
batch.replace(msg, tmpMsg);
else
log.warn("Unrecognised cipher discarding message");
}
catch(Exception e) {
log.error("failed decrypting message", e);
}
}
break;
default:
batch.remove(msg); // a control message will get handled by ENCRYPT and should not be passed up
handleUpEvent(msg, hdr);
break;
}
}
if(!batch.isEmpty())
up_prot.up(batch);
}
public Object passItUp(Event evt) {
if(observer != null)
observer.passUp(evt);
return up_prot != null? up_prot.up(evt) : null;
}
private synchronized void handleViewChange(View view, boolean makeServer) {
// if view is a bit broken set me as keyserver
List<Address> members = view.getMembers();
if (members == null || members.isEmpty() || members.get(0) == null) {
becomeKeyServer(local_addr, false);
return;
}
// otherwise get keyserver from view controller
Address tmpKeyServer=view.getMembers().get(0);
//I am new keyserver - either first member of group or old key server is no more and
// I have been voted new controller
if(makeServer || (tmpKeyServer.equals(local_addr) && (keyServerAddr == null || (!tmpKeyServer.equals(keyServerAddr))))) {
becomeKeyServer(tmpKeyServer, makeServer);
// a new keyserver has been set and it is not me
}
else if(keyServerAddr == null || (!tmpKeyServer.equals(keyServerAddr)) || (keyServer && !tmpKeyServer.equals(local_addr))) {
handleNewKeyServer(tmpKeyServer);
}
else {
if(log.isDebugEnabled())
log.debug("Membership has changed but I do not care");
}
}
/**
* Handles becoming server - resetting queue settings and setting keyserver
* address to be local address.
*
* @param tmpKeyServer
*/
private void becomeKeyServer(Address tmpKeyServer, boolean forced) {
keyServerAddr=tmpKeyServer;
keyServer=true;
if(log.isDebugEnabled() && !forced)
log.debug("[" + local_addr + "] I have become the new key server ");
queue_down=false;
queue_up=false;
}
/**
* Sets up the peer for a new keyserver - this is setting queueing to buffer
* messages until we have a new secret key from the key server and sending a
* key request to the new keyserver.
*
* @param newKeyServer
*/
private void handleNewKeyServer(Address newKeyServer) {
// start queueing until we have new key
// to make sure we are not sending with old key
queue_up=true;
queue_down=true;
// set new keyserver address
keyServerAddr=newKeyServer;
keyServer=false;
if(log.isDebugEnabled())
log.debug("[" + local_addr + "] " + keyServerAddr + " has become the new key server, sending key request to it");
// create a key request message
sendKeyRequest();
}
private void handleUpMessage(Event evt) throws Exception {
Message msg=(Message)evt.getArg();
if(msg == null || (msg.getLength() == 0 && !encrypt_entire_message)) {
passItUp(evt);
return;
}
EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id);
if(hdr == null) {
if(log.isTraceEnabled())
log.trace("dropping message as ENCRYPT header is null or has not been recognized, msg will not be passed up, " +
"headers are " + msg.printHeaders());
return;
}
if(log.isTraceEnabled())
log.trace("header received " + hdr);
switch(hdr.getType()) {
case EncryptHeader.ENCRYPT:
handleEncryptedMessage(msg, evt, hdr);
break;
default:
handleUpEvent(msg, hdr);
break;
}
}
protected void handleEncryptedMessage(Message msg, Event evt, EncryptHeader hdr) throws Exception {
// if msg buffer is empty, and we didn't encrypt the entire message, just pass up
if(!hdr.encrypt_entire_msg && msg.getLength() == 0) {
if(log.isTraceEnabled())
log.trace("passing up message as it has an empty buffer ");
passItUp(evt);
return;
}
// if queueing then pass into queue to be dealt with later
if(queue_up) {
if(log.isTraceEnabled())
log.trace("queueing up message as no session key established: " + msg);
upMessageQueue.put(msg);
}
else {
// make sure we pass up any queued messages first
// could be more optimised but this can wait we only need this if not using supplied key
if(!suppliedKey)
drainUpQueue();
// try and decrypt the message - we need to copy msg as we modify its
// buffer (http://jira.jboss.com/jira/browse/JGRP-538)
Message tmpMsg=decryptMessage(symDecodingCipher, msg.copy());
if(tmpMsg != null) {
if(log.isTraceEnabled())
log.trace("decrypted message " + tmpMsg);
passItUp(new Event(Event.MSG, tmpMsg));
}
else
log.warn("Unrecognised cipher discarding message");
}
}
protected void handleUpEvent(Message msg, EncryptHeader hdr) {
// check if we had some sort of encrypt control header if using supplied key we should not process it
if(suppliedKey) {
if(log.isWarnEnabled())
log.warn("We received an encrypt header of " + hdr.getType() + " while in configured mode");
return;
}
// see what sort of encrypt control message we have received
switch(hdr.getType()) {
// if a key request
case EncryptHeader.KEY_REQUEST:
if(log.isDebugEnabled())
log.debug("received a key request from peer");
// if a key request send response key back
try {
// extract peer's public key
PublicKey tmpKey=generatePubKey(msg.getBuffer());
// send back the secret key we have
sendSecretKey(getSecretKey(), tmpKey, msg.getSrc());
}
catch(Exception e) {
log.warn("unable to reconstitute peer's public key");
}
break;
case EncryptHeader.SECRETKEY:
if(log.isDebugEnabled())
log.debug("received a secretkey response from keyserver");
try {
SecretKey tmp=decodeKey(msg.getBuffer());
if(tmp == null)
sendKeyRequest(); // unable to understand response, let's try again
else {
// otherwise lets set the returned key as the shared key
setKeys(tmp, hdr.getVersion());
if(log.isDebugEnabled())
log.debug("Decoded secretkey response");
}
}
catch(Exception e) {
log.warn("unable to process received public key");
}
break;
default:
log.warn("Received ignored encrypt header of " + hdr.getType());
break;
}
}
/**
* used to drain the up queue - synchronized so we can call it safely
* despite access from potentially two threads at once
*
* @throws QueueClosedException
* @throws Exception
*/
private void drainUpQueue() throws Exception {
if(log.isTraceEnabled()) {
int size=upMessageQueue.size();
if(size > 0)
log.trace("draining " + size + " messages from the up queue");
}
Message tmp=null;
while((tmp=upMessageQueue.poll(0L, TimeUnit.MILLISECONDS)) != null) {
Message msg=decryptMessage(symDecodingCipher, tmp.copy());
if(msg != null)
passItUp(new Event(Event.MSG, msg));
else
log.warn("discarding message in queue up drain as cannot decode it");
}
}
/**
* Sets the keys for the app. and drains the queues - the drains could be
* called att he same time as the up/down messages calling in to the class
* so we may have an extra call to the drain methods but this slight expense
* is better than the alternative of waiting until the next message to
* trigger the drains which may never happen.
*
* @param key
* @param version
* @throws Exception
*/
private void setKeys(SecretKey key, String version) throws Exception {
// put the previous key into the map
// if the keys are already there then they will overwrite
keyMap.put(getSymVersion(), getSymDecodingCipher());
setSecretKey(key);
initSymCiphers(key.getAlgorithm(), key);
setSymVersion(version);
// drain the up queue
log.debug("setting queue up to false in setKeys");
queue_up=false;
drainUpQueue();
queue_down=false;
drainDownQueue();
}
/**
* Does the actual work for decrypting - if version does not match current
* cipher then tries to use previous cipher
*
* @param cipher
* @param msg
* @return
* @throws Exception
*/
private Message decryptMessage(Cipher cipher, Message msg) throws Exception {
EncryptHeader hdr=(EncryptHeader)msg.getHeader(this.id);
if(!hdr.getVersion().equals(getSymVersion())) {
log.warn("attempting to use stored cipher as message does not use current encryption version ");
cipher=keyMap.get(hdr.getVersion());
if(cipher == null) {
log.warn("Unable to find a matching cipher in previous key map");
return null;
}
else {
if(log.isTraceEnabled())
log.trace("decrypting using previous cipher version " + hdr.getVersion());
return _decrypt(cipher, msg, hdr.encrypt_entire_msg);
}
}
else {
// reset buffer with decrypted message
return _decrypt(cipher, msg, hdr.encrypt_entire_msg);
}
}
private Message _decrypt(Cipher cipher, Message msg, boolean decrypt_entire_msg) throws Exception {
byte[] decrypted_msg;
decrypt_lock.lock();
try {
decrypted_msg=cipher.doFinal(msg.getRawBuffer(), msg.getOffset(), msg.getLength());
}
finally {
decrypt_lock.unlock();
}
if(!decrypt_entire_msg) {
msg.setBuffer(decrypted_msg);
return msg;
}
Message ret=(Message)Util.streamableFromByteBuffer(Message.class, decrypted_msg);
if(ret.getDest() == null)
ret.setDest(msg.getDest());
if(ret.getSrc() == null)
ret.setSrc(msg.getSrc());
return ret;
}
/**
* @param secret
* @param pubKey
* @throws InvalidKeyException
* @throws IllegalStateException
* @throws IllegalBlockSizeException
* @throws BadPaddingException
*/
private void sendSecretKey(SecretKey secret, PublicKey pubKey, Address source) throws InvalidKeyException,
IllegalStateException,
IllegalBlockSizeException,
BadPaddingException,
NoSuchPaddingException,
NoSuchAlgorithmException,
NoSuchProviderException {
Message newMsg;
if(log.isDebugEnabled())
log.debug("encoding shared key ");
// create a cipher with peer's public key
Cipher tmp;
if (asymProvider != null && !asymProvider.trim().isEmpty())
tmp=Cipher.getInstance(asymAlgorithm, asymProvider);
else
tmp=Cipher.getInstance(asymAlgorithm);
tmp.init(Cipher.ENCRYPT_MODE,pubKey);
//encrypt current secret key
byte[] encryptedKey=tmp.doFinal(secret.getEncoded());
newMsg=new Message(source, local_addr, encryptedKey)
.putHeader(this.id, new EncryptHeader(EncryptHeader.SECRETKEY, getSymVersion()));
if(log.isDebugEnabled())
log.debug(" Sending version " + getSymVersion() + " encoded key to client");
passItDown(new Event(Event.MSG,newMsg));
}
/**
* @return Message
*/
private Message sendKeyRequest() {
// send client's public key to server and request
// server's public key
Message newMsg=new Message(keyServerAddr, local_addr, Kpair.getPublic().getEncoded())
.putHeader(this.id,new EncryptHeader(EncryptHeader.KEY_REQUEST,getSymVersion()));
passItDown(new Event(Event.MSG,newMsg));
return newMsg;
}
/* (non-Javadoc)
* @see org.jgroups.stack.Protocol#down(org.jgroups.Event)
*/
public Object down(Event evt) {
if(observer != null)
observer.down(evt);
switch(evt.getType()) {
case Event.MSG:
Message msg=(Message)evt.getArg();
try {
if(queue_down) {
if(log.isTraceEnabled())
log.trace("queueing down message as no session key established" + msg);
downMessageQueue.put(msg); // queue messages if we are waiting for a new key
}
else {
// make sure the down queue is drained first to keep ordering
if(!suppliedKey) {
drainDownQueue();
}
sendDown(msg);
}
}
catch(Exception e) {
log.warn("unable to send down event " + e);
}
return null;
case Event.VIEW_CHANGE:
View view=(View)evt.getArg();
if(log.isDebugEnabled())
log.debug("new view: " + view);
if(!suppliedKey) {
handleViewChange(view, false);
}
break;
case Event.SET_LOCAL_ADDRESS:
local_addr=(Address)evt.getArg();
if(log.isDebugEnabled())
log.debug("set local address to " + local_addr);
break;
case Event.TMP_VIEW:
view=(View)evt.getArg();
if(!suppliedKey) {
// if a tmp_view then we are trying to become coordinator so
// make us keyserver
handleViewChange(view, true);
}
break;
default:
break;
}
return down_prot.down(evt);
}
public Object passItDown(Event evt) {
if(observer != null)
observer.passDown(evt);
return down_prot != null? down_prot.down(evt) : null;
}
/**
* @throws Exception
* @throws QueueClosedException
*/
private void drainDownQueue() throws Exception {
if(log.isTraceEnabled()) {
int size=downMessageQueue.size();
if(size > 0)
log.trace("draining " + size + " messages from the down queue");
}
Message tmp=null;
while((tmp=downMessageQueue.poll(0L, TimeUnit.MILLISECONDS)) != null) {
sendDown(tmp);
}
}
/**
* @param msg
* @throws Exception
*/
private void sendDown(Message msg) throws Exception {
if(msg.getLength() == 0 && !encrypt_entire_message) {
passItDown(new Event(Event.MSG, msg));
return;
}
EncryptHeader hdr=new EncryptHeader(EncryptHeader.ENCRYPT, getSymVersion());
hdr.encrypt_entire_msg=this.encrypt_entire_message;
if(encrypt_entire_message) {
if(msg.getSrc() == null)
msg.setSrc(local_addr);
byte[] serialized_msg=Util.streamableToByteBuffer(msg);
byte[] encrypted_msg=encryptMessage(symEncodingCipher,
serialized_msg,
0,
serialized_msg.length);
// we need to exclude existing headers, they will be seen again when we decrypt and unmarshal the message
// on the receiver
Message tmp=msg.copy(false, false);
tmp.setBuffer(encrypted_msg);
if(tmp.getSrc() == null)
tmp.setSrc(local_addr);
tmp.putHeader(this.id, hdr);
passItDown(new Event(Event.MSG, tmp));
return;
}
// put our encrypt header on the message
msg.putHeader(this.id, hdr);
// copy neeeded because same message (object) may be retransmitted -> no double encryption
Message msgEncrypted=msg.copy(false);
msgEncrypted.setBuffer(encryptMessage(symEncodingCipher,
msg.getRawBuffer(),
msg.getOffset(),
msg.getLength()));
passItDown(new Event(Event.MSG, msgEncrypted));
}
/**
*
* @param cipher
* @param plain
* @return
* @throws Exception
*/