-
Notifications
You must be signed in to change notification settings - Fork 83
/
security.py
1551 lines (1347 loc) · 62.4 KB
/
security.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
import logging
import os
import secrets
import ssl
from base64 import urlsafe_b64encode
from datetime import datetime
from enum import Enum, auto
from ssl import DER_cert_to_PEM_cert, SSLContext, SSLError, VerifyMode
from typing import Dict, List, Optional, Tuple, Union, cast
from cryptography.exceptions import InvalidSignature, UnsupportedAlgorithm
from cryptography.hazmat.backends.openssl.backend import Backend
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.ec import (
SECP256R1,
EllipticCurvePrivateKey,
EllipticCurvePublicKey,
)
from cryptography.hazmat.primitives.asymmetric.utils import (
decode_dss_signature,
encode_dss_signature,
)
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives.hashes import SHA256, Hash, HashAlgorithm
from cryptography.hazmat.primitives.kdf.concatkdf import ConcatKDFHash
from cryptography.hazmat.primitives.serialization import (
Encoding,
PublicFormat,
load_der_private_key,
load_pem_private_key,
)
from cryptography.x509 import (
AuthorityInformationAccessOID,
Certificate,
ExtensionNotFound,
ExtensionOID,
NameOID,
extensions,
load_der_x509_certificate,
)
from cryptography.x509.ocsp import OCSPRequestBuilder
from iso15118.shared.exceptions import (
CertAttributeError,
CertChainLengthError,
CertExpiredError,
CertNotYetValidError,
CertRevokedError,
CertSignatureError,
DecryptionError,
EncryptionError,
InvalidProtocolError,
KeyTypeError,
OCSPServerNotFoundError,
PrivateKeyReadError,
)
from iso15118.shared.exi_codec import EXI
from iso15118.shared.messages.enums import Namespace, Protocol
from iso15118.shared.messages.iso15118_2.datatypes import (
CertificateChain as CertificateChainV2,
)
from iso15118.shared.messages.iso15118_2.datatypes import (
SubCertificates as SubCertificatesV2,
)
from iso15118.shared.messages.iso15118_20.common_messages import (
CertificateChain as CertificateChainV20,
)
from iso15118.shared.messages.iso15118_20.common_messages import SignedCertificateChain
from iso15118.shared.messages.iso15118_20.common_messages import (
SubCertificates as SubCertificatesV20,
)
from iso15118.shared.messages.xmldsig import (
CanonicalizationMethod,
DigestMethod,
Reference,
Signature,
SignatureMethod,
SignatureValue,
SignedInfo,
Transform,
Transforms,
)
from iso15118.shared.settings import SettingKey, shared_settings
logger = logging.getLogger(__name__)
class KeyEncoding(str, Enum):
PEM = auto()
DER = auto()
def get_random_bytes(nbytes: int) -> bytes:
"""
Creates a bytes object with randomly generated bytes of length provided by
the nbytes parameter.
"""
return secrets.token_bytes(nbytes)
def get_ssl_context(server_side: bool) -> Optional[SSLContext]:
"""
Creates an SSLContext object for the TCP client or TCP server.
An SSL context holds various data longer-lived than single SSL
connections, such as SSL configuration options, certificate(s) and
private key(s). It also manages a cache of SSL sessions for
server-side sockets, in order to speed up repeated connections from
the same clients.
The IANA cipher suite names
- TLS_ECDH_ECDSA_WITH_AES_128_CBC_SHA256 and
- TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256
(as given in ISO 15118-2) map to the OpenSSL cipher suite names
- ECDH-ECDSA-AES128-SHA256 and
- ECDHE-ECDSA-AES128-SHA256,
respectively. See https://testssl.sh/openssl-iana.mapping.html
TODO More/other cipher suites are allowed in ISO 15118-20
Args:
server_side: Whether this SSLContext object is for the TLS server (True)
or TLS client (False)
Returns:
An SSLContext object
TODO We use the test PKI provided for the CharIN Testival Europe 2021.
Need to figure out a way to securely store those certs and keys
as well as read the password.
"""
if shared_settings[SettingKey.ENABLE_TLS_1_3]:
ssl_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
else:
# Specifying protocol as `PROTOCOL_TLS` does best effort.
# TLSv1.3 will be attempted and would fallback to 1.2 if not possible.
# However, there may be TLS clients that can't perform
# 1.2 fallback, here we explicitly set the TLS version
# to 1.2, to be sure we won't fall into connection issues
ssl_context = SSLContext(protocol=ssl.PROTOCOL_TLSv1_2)
if server_side:
try:
ssl_context.load_cert_chain(
certfile=CertPath.CPO_CERT_CHAIN_PEM,
keyfile=KeyPath.SECC_LEAF_PEM,
password=load_priv_key_pass(KeyPasswordPath.SECC_LEAF_KEY_PASSWORD),
)
except SSLError:
logger.exception(
"SSLError, can't load SECC certificate chain for SSL "
"context. Private key (keyfile) probably doesn't "
"match certificate (certfile) or password for "
"private is key invalid. Returning None instead."
)
return None
except FileNotFoundError:
logger.exception("Can't find certfile or keyfile for SSL context")
return None
except Exception as exc:
logger.exception(exc)
return None
if shared_settings[SettingKey.ENABLE_TLS_1_3]:
# In 15118-20 we should also verify EVCC's certificate chain.
# The spec however says TLS 1.3 should also support 15118-2
# (Table 5 in V2G20 specification)
# Marc/André - this suggests we will need mutual auth 15118-2 if
# TLS1.3 is enabled.
ssl_context.load_verify_locations(cafile=CertPath.OEM_ROOT_PEM)
ssl_context.verify_mode = VerifyMode.CERT_REQUIRED
else:
# In ISO 15118-2, we only verify the SECC's certificates
ssl_context.verify_mode = VerifyMode.CERT_NONE
# The SECC must support both ciphers defined in ISO 15118-20
# OpenSSL 1.3 supports TLS 1.3 cipher suites by default.
# Calling .set_ciphers to be more evident about what is available.
# Cipher suites for both 15118-20 and 15118-2 are provided to be compatible with
# both 15118 families [V2G20-2059]. The order is as specified in the
# specification [V2G20-1856]
# TODO: A configuration mechanism could be provided to add/remove cipher
# suite in case where a vulnerability is identified with any of them
ssl_context.set_ciphers(
"TLS_AES_256_GCM_SHA384:"
"TLS_CHACHA20_POLY1305_SHA256:"
"ECDH-ECDSA-AES128-SHA256:"
"ECDHE-ECDSA-AES128-SHA256"
)
else:
# Load the V2G Root CA certificate(s) to validate the SECC's leaf and
# Sub-CA CPO certificates. The cafile string is the path to a file of
# concatenated (if several exist) V2G Root CA certificates in PEM format
ssl_context.load_verify_locations(cafile=CertPath.V2G_ROOT_PEM)
ssl_context.check_hostname = False
ssl_context.verify_mode = VerifyMode.CERT_REQUIRED
# In 15118-20, the EVCC must support all cipher suites in the spec [V2G20-2459]
# In 15118-2, the EVCC must support only one cipher suite, so let's choose the
# more secure one (ECDHE enables perfect forward secrecy)
ssl_context.set_ciphers(
"TLS_AES_256_GCM_SHA384:"
"TLS_CHACHA20_POLY1305_SHA256:"
"ECDHE-ECDSA-AES128-SHA256"
)
if shared_settings[SettingKey.ENABLE_TLS_1_3]:
try:
ssl_context.load_cert_chain(
certfile=CertPath.OEM_CERT_CHAIN_PEM,
keyfile=KeyPath.OEM_LEAF_PEM,
password=load_priv_key_pass(KeyPasswordPath.OEM_LEAF_KEY_PASSWORD),
)
except SSLError:
logger.exception(
"SSLError, can't load OEM certificate chain for SSL "
"context. Private key (keyfile) probably doesn't "
"match certificate (certfile) or password for "
"private is key invalid. Returning None instead."
)
return None
except FileNotFoundError:
logger.exception("Can't find OEM certfile or keyfile for SSL context")
return None
except Exception as exc:
logger.exception(exc)
return None
# The OpenSSL name for ECDH curve secp256r1 is prime256v1
ssl_context.set_ecdh_curve("prime256v1")
return ssl_context
def load_priv_key_pass(
password_path: str,
) -> bytes:
"""
Reads the password for the encrypted private key.
TODO This is obviously not a secure way of storing and reading a password
for a private key. Need to engage with security experts on how this
would be implemented in a secure production environment
Args:
password_path: The file path to the password TXT file
Returns:
The password as a str object
Raises:
FileNotFoundError, IOError
"""
if password_path:
try:
with open(password_path, "r") as password_file:
password = password_file.readline().rstrip().encode(encoding="utf-8")
if password == b"":
# TODO: Check if it is possible to have a private key with empty
# password. Not without a password - but a password like this: ""
# Returning None to represent cases where there is no
# passphrase set.
return None
else:
return password
except (FileNotFoundError, IOError) as exc:
raise exc
else:
# This must be the same password as used for creating the private keys
# and certificates. See create_certs.sh or request the password from
# the providers of another test PKI (e.g. CharIN Testivals).
return "12345".encode(encoding="utf-8")
def load_priv_key(
key_path: str, key_encoding: KeyEncoding, key_password_file_path: str
) -> EllipticCurvePrivateKey:
"""
Loads a PEM or DER encoded private key given the provided key_path and
returns the key as an EllipticCurvePrivateKey object.
Args:
key_path: The file path to the DER encoded private key
key_encoding: The encoding format (KeyEncoding) of the private key
(PEM or DER).
key_password_file_path: Path to the file where password is stored for the
private key. The file must exist even if there is no password to the private
key. The file maybe empty if there is no password.
Returns:
An EllipticCurvePrivateKey object corresponding to the private key read
Raises:
FileNotFoundError, IOError
"""
try:
with open(key_path, "rb") as key_file:
try:
if key_encoding == KeyEncoding.PEM:
priv_key = load_pem_private_key(
key_file.read(), load_priv_key_pass(key_password_file_path)
)
else:
priv_key = load_der_private_key(
key_file.read(), load_priv_key_pass(key_password_file_path)
)
if isinstance(priv_key, EllipticCurvePrivateKey):
return priv_key
# TODO Add support for other keys used in ISO 15118-20
raise PrivateKeyReadError(
f"Unknown key type at location {key_path}. "
"Expected key of type EllipticCurvePrivateKey"
)
except ValueError as exc:
raise PrivateKeyReadError(
"The PEM data could not be decrypted or its "
"structure could not be decoded "
"successfully."
) from exc
except TypeError as exc:
raise PrivateKeyReadError(
"Either password was given and private key "
"was not encrypted or key was encrypted "
"but no password was supplied."
) from exc
except UnsupportedAlgorithm as exc:
raise PrivateKeyReadError(
"Serialized key is of a type not supported "
"by the crypto library."
) from exc
except (FileNotFoundError, IOError) as exc:
raise PrivateKeyReadError(f"Key file not found at location {key_path}") from exc
def to_ec_pub_key(public_key_bytes: bytes) -> EllipticCurvePublicKey:
"""
Takes a public key in bytes for the named elliptic curve secp256R1, as used
ISO 15118-2, and returns it as an instance of EllipticCurvePublicKey.
Args:
public_key_bytes: The elliptic curve public key, serialised as bytes.
Returns:
An instance of EllipticCurvePublicKey corresponding to the provided
bytes object.
Raises:
ValueError, TypeError
TODO Need to make more flexible for other elliptic curves used in
ISO 15118-20
"""
try:
ec_pub_key = EllipticCurvePublicKey.from_encoded_point(
curve=SECP256R1(), data=public_key_bytes
)
return ec_pub_key
except ValueError as exc:
logging.exception(
"An invalid point is supplied, can't convert "
"bytes to EllipticCurvePublicKey instance"
)
raise exc
except TypeError as exc:
logging.exception(
"Curve provided is not an EllipticCurve, can't "
"convert byets to EllipticCurvePublicKey instance"
)
raise exc
def load_cert(cert_path: str) -> bytes:
"""
Loads a DER encoded certificate given the provided cert_path and returns
the read bytes.
See https://docs.python.org/3/library/ssl.html#ssl-certificates for more
information on how certificates work.
Args:
cert_path: The file path to the DER encoded certificate
Returns:
The DER encoded certificate, given as a bytes object
Raises:
FileNotFoundError, IOError
"""
with open(cert_path, "rb") as cert_file:
return cert_file.read()
def load_cert_chain(
protocol: Protocol,
leaf_path: str,
sub_ca2_path: str,
sub_ca1_path: str = None,
id: str = None,
) -> Union[CertificateChainV2, CertificateChainV20, SignedCertificateChain]:
"""
Reads the leaf and sub-CA certificate(s) from file and returns a
CertificateChain object corresponding to the protocol provided.
See https://docs.python.org/3/library/ssl.html#certificate-chains for more
information on how certificate chains work.
Args:
protocol: The ISO 15118 protocol version (-2 or -20)
leaf_path: Path to the leaf certificate (e.g. contract certificate,
EVSE/SECC certificate, OEM provisioning certificate)
sub_ca2_path: Path to the Sub-CA 2 certificate, whose public key is
used to verify the signature of the leaf certificate.
sub_ca1_path: Path to the optional Sub-CA 1 certificate, whose public
key is used to verify the signature of the Sub-CA 1
certificate, in case two Sub-CA certificates are used.
If a Sub-CA 1 certificate is used, then this certificate
has been issued by the root CA certificate. If not, then
the Sub-CA 2 certificate has been issued by the root CA
certificate.
id: The optional ID attribute, in case the certificate chain is part of
the header's signature, in which case this function returns a
SignedCertificateChain instead of a CertificateChain
(ISO 15118-20 only).
Returns:
A CertificateChain instance, either for ISO 15118-2 or -20.
Raises:
InvalidProtocolError
"""
leaf_cert = load_cert(leaf_path)
sub_ca2_cert = load_cert(sub_ca2_path)
sub_ca1_cert = load_cert(sub_ca1_path) if sub_ca1_path else None
if protocol == Protocol.ISO_15118_2:
sub_ca_certs_v2: SubCertificatesV2 = SubCertificatesV2(
certificates=[sub_ca2_cert]
)
if sub_ca1_cert:
sub_ca_certs_v2.certificates.append(sub_ca1_cert)
return CertificateChainV2(
certificate=leaf_cert, sub_certificates=sub_ca_certs_v2
)
if protocol.ns.startswith(Namespace.ISO_V20_BASE):
sub_ca_certs_v20: SubCertificatesV20 = SubCertificatesV20(
certificates=[sub_ca2_cert]
)
if sub_ca1_cert:
sub_ca_certs_v20.certificates.append(sub_ca1_cert)
if id:
# In ISO 15118-20, there's a distinction between a CertificateChain
# and a SignedCertificateChain (which includes the id attribute).
return SignedCertificateChain(
id=id, certificate=leaf_cert, sub_certificates=sub_ca_certs_v20
)
return CertificateChainV20(
certificate=leaf_cert, sub_certificates=sub_ca_certs_v20
)
raise InvalidProtocolError(f"'{protocol}' is not a valid Protocol enum")
def log_certs_details(certs: List[bytes]):
for cert in certs:
der_cert = load_der_x509_certificate(cert)
logger.debug(f"Subject: {der_cert.subject}")
logger.debug(f"Issuer: {der_cert.issuer}")
logger.debug(f"Serial number: {der_cert.serial_number}")
logger.debug(
f"Validity: {der_cert.not_valid_before} - {der_cert.not_valid_after}"
)
logger.debug(
f"Fingerprint: {der_cert.fingerprint(der_cert.signature_hash_algorithm).hex(':')}" # noqa
)
logger.debug("===")
def verify_certs(
leaf_cert_bytes: bytes,
sub_ca_certs_bytes: List[bytes],
root_ca_cert_bytes: bytes,
private_environment: bool = False,
):
"""
Verifies a certificate chain according to the following criteria:
1. Check that the current date is within the time span provided by the
certificate's notBefore and notAfter attributes
2. Verify the signature of each certificate contained in the cert chain
(throws CertSignatureError if not)
1.a) Get the sub_ca_certs in order: leaf -> sub_ca_2 -> sub_ca_1 -> root
(if two sub-CAs are in use, otherwise: leaf -> sub_ca_2 -> root)
2.b) Do the actual signature verification from leaf to root
3. Checks that none of the certificates has been revoked.
Args:
leaf_cert_bytes: The DER encoded leaf certificate
sub_ca_certs: One or more DER encoded sub-CA certificates, which are
needed to verify the chain of signatures from the leaf
certificate all the way to the root CA certificate.
The order of the sub-CA certificates doesn't matter,
verify_certs will try to work with either a sub-CA 1 or
a sub-CA 2 certificate as first list entry, íf two sub-CA
certificates are present.
No more than two sub-CA certificates are allowed.
root_ca_cert: The root CA (certificate authority) certificate, which is used
to verify the signature of the top-level sub-CA certificate
private_environment: Whether or not the certificate chain to check is
that of a private environment (PE). In a PE, there
are no sub-CA certificates.
Raises:
CertSignatureError, CertNotYetValidError, CertExpiredError,
CertRevokedError, CertAttributeError, CertChainLengthError, KeyTypeError
"""
leaf_cert = load_der_x509_certificate(leaf_cert_bytes)
sub_ca2_cert = None
sub_ca1_cert = None
root_ca_cert = None
if root_ca_cert_bytes:
root_ca_cert = load_der_x509_certificate(root_ca_cert_bytes)
sub_ca_der_certs: List[Certificate] = [
load_der_x509_certificate(cert) for cert in sub_ca_certs_bytes
]
# Step 1: Check that each certificate is valid, i.e. the current time is
# between the notBefore and notAfter timestamps of the certificate
try:
certs_to_check: List[Certificate] = [leaf_cert]
if len(sub_ca_der_certs) != 0:
certs_to_check.extend(sub_ca_der_certs)
check_validity(certs_to_check)
except (CertNotYetValidError, CertExpiredError) as exc:
raise exc
if not root_ca_cert:
logger.info("Can't validate the chain as MO root is not present.")
return None
# Step 2.a: Categorize the sub-CA certificates into sub-CA 1 and sub-CA 2.
# A sub-CA 2 certificate's profile has its PathLength extension
# attribute set to 0, whereas a sub-CA 1 certificate's profile has
# its PathLength extension attribute set to 0.
# Only a sub-CA 2 can issue a leaf certificate. If a sub-CA 1 is
# used, then it issues the certificate for a sub-CA 2 and has its
# certificate issued by the root CA. If no sub-CA 1 is used, then
# the root CA issues the sub-CA 2's certificate directly.
# TODO We also need to check each certificate's attributes for
# compliance with the corresponding certificate profile
for cert in sub_ca_der_certs:
try:
basic_contrains = cert.extensions.get_extension_for_oid(
ExtensionOID.BASIC_CONSTRAINTS
).value
path_len = 0
if isinstance(basic_contrains, extensions.BasicConstraints):
path_len = basic_contrains.path_length
except ExtensionNotFound:
raise CertAttributeError(
subject=cert.subject.__str__(), attr="PathLength", invalid_value="None"
)
if path_len == 0:
if sub_ca2_cert:
logger.error(
f"Sub-CA cert {sub_ca2_cert.subject.__str__()} "
"already has PathLength attribute set to 0. "
"A certificate chain must not contain two "
"certificates with the same path length"
)
raise CertAttributeError(
subject=cert.subject.__str__(), attr="PathLength", invalid_value="0"
)
sub_ca2_cert = cert
elif path_len == 1:
if sub_ca1_cert:
logger.error(
f"Sub-CA cert {sub_ca1_cert.subject.__str__()} "
f"already has PathLength attribute set to 1. "
"A certificate chain must not contain two "
"certificates with the same path length"
)
raise CertAttributeError(
subject=cert.subject.__str__(), attr="PathLength", invalid_value="1"
)
sub_ca1_cert = cert
else:
raise CertChainLengthError(allowed_num_sub_cas=2, num_sub_cas=path_len)
if not sub_ca2_cert and not private_environment:
raise CertChainLengthError(allowed_num_sub_cas=2, num_sub_cas=0)
if (sub_ca2_cert or sub_ca1_cert) and private_environment:
raise CertChainLengthError(allowed_num_sub_cas=0, num_sub_cas=1)
# Step 2.b: Now that we have established the right order of sub-CA
# certificates we can start verifying the signatures from leaf
# certificate to root CA certificate
cert_to_check = leaf_cert
try:
if private_environment:
if isinstance(pub_key := root_ca_cert.public_key(), EllipticCurvePublicKey):
pub_key.verify(
leaf_cert.signature,
leaf_cert.tbs_certificate_bytes,
ec.ECDSA(SHA256()),
)
else:
# TODO Add support for ISO 15118-20 public key types
raise KeyTypeError(
f"Unexpected public key type " f"{type(root_ca_cert.public_key())}"
)
elif not sub_ca2_cert:
logger.error("Sub-CA 2 certificate missing in public cert chain")
raise CertChainLengthError(allowed_num_sub_cas=2, num_sub_cas=0)
else:
if isinstance(pub_key := sub_ca2_cert.public_key(), EllipticCurvePublicKey):
pub_key.verify(
leaf_cert.signature,
leaf_cert.tbs_certificate_bytes,
# TODO Find a way to read id dynamically from the certificate
ec.ECDSA(SHA256()),
)
else:
# TODO Add support for ISO 15118-20 public key types
raise KeyTypeError(
f"Unexpected public key type " f"{type(sub_ca2_cert.public_key())}"
)
if sub_ca1_cert:
cert_to_check = sub_ca2_cert
if isinstance(
pub_key := sub_ca1_cert.public_key(), EllipticCurvePublicKey
):
pub_key.verify(
sub_ca2_cert.signature,
sub_ca2_cert.tbs_certificate_bytes,
ec.ECDSA(SHA256()),
)
else:
# TODO Add support for ISO 15118-20 public key types
raise KeyTypeError(
f"Unexpected public key type "
f"{type(sub_ca1_cert.public_key())}"
)
cert_to_check = sub_ca1_cert
if isinstance(
pub_key := root_ca_cert.public_key(), EllipticCurvePublicKey
):
pub_key.verify(
sub_ca1_cert.signature,
sub_ca1_cert.tbs_certificate_bytes,
ec.ECDSA(SHA256()),
)
else:
# TODO Add support for ISO 15118-20 public key types
raise KeyTypeError(
f"Unexpected public key type "
f"{type(root_ca_cert.public_key())}"
)
else:
cert_to_check = sub_ca2_cert
if isinstance(
pub_key := root_ca_cert.public_key(), EllipticCurvePublicKey
):
pub_key.verify(
sub_ca2_cert.signature,
sub_ca2_cert.tbs_certificate_bytes,
ec.ECDSA(SHA256()),
)
else:
# TODO Add support for ISO 15118-20 public key types
raise KeyTypeError(
f"Unexpected public key type "
f"{type(root_ca_cert.public_key())}"
)
except InvalidSignature as exc:
raise CertSignatureError(
subject=cert_to_check.subject.__str__(),
issuer=cert_to_check.issuer.__str__(),
) from exc
except UnsupportedAlgorithm as exc:
cert_hash_algorithm: HashAlgorithm = cert_to_check.signature_hash_algorithm
raise CertSignatureError(
subject=cert_to_check.subject.__str__(),
issuer=cert_to_check.issuer.__str__(),
extra_info=f"UnsupportedAlgorithm for certificate "
f"{cert_to_check.subject.__str__()}. "
f"\nSignature hash algorithm: "
f"{cert_hash_algorithm.name if cert_hash_algorithm else 'None'}"
f"\nSignature algorithm: "
f"{cert_to_check.signature_algorithm_oid}"
# TODO This OpenSSL version may not be the complied one
# that is actually used, need to check
f"\nOpenSSL version: {Backend().openssl_version_text()}",
) from exc
except Exception as exc:
logger.exception(
f"{exc.__class__.__name__} while verifying signature"
f"of certificate {cert_to_check.subject}"
)
# Step 3: Check the OCSP (Online Certificate Status Protocol) response to
# see whether or not a certificate has been revoked
# TODO As OCSP is not supported for the CharIN Testival Europe 2021, we'll
# postpone that step a bit
def check_validity(certs: List[Certificate]):
"""
Checks that the current time is between the notBefore and notAfter
timestamps of each certificate provided in the list.
Args:
certs: A list of DER encoded certificates, given as Certificate
instances (from the cryptography library)
Raises:
CertNotYetValidError, CertExpiredError
"""
now = datetime.utcnow()
for cert in certs:
if cert.not_valid_before > now:
raise CertNotYetValidError(cert.subject.__str__())
if cert.not_valid_after < now:
raise CertExpiredError(cert.subject.__str__())
def get_cert_cn(der_cert: bytes) -> str:
"""
Retrieves the 'CN' (Common Name) attribute of the 'Subject' attribute of a
DER encoded certificate and returns it as a string.
Args:
der_cert: A DER encoded certificate
Returns:
The Common Name attribute of the DER encoded certificate
"""
cert = load_der_x509_certificate(der_cert)
cn = cert.subject.get_attributes_for_oid(NameOID.COMMON_NAME).pop()
if isinstance(cn.value, str):
return cn.value
return cn.value.decode("utf-8")
def get_cert_issuer_serial(cert_path: str) -> Tuple[str, int]:
"""
Retrieves the issuer attribute and serial number (both together uniquely
identify a certificate) of an X.509 certificate
Args:
cert_path: The path to the DER encoded certificate
Returns:
A tuple with the first tuple entry being the issuer name and the
second tuple entry being the issuer's serial number for that certificate
"""
cert = load_cert(cert_path)
der_cert = load_der_x509_certificate(cert)
return der_cert.issuer.__str__(), der_cert.serial_number
def create_signature(
elements_to_sign: List[Tuple[str, bytes]], signature_key: EllipticCurvePrivateKey
) -> Signature:
"""
Creates a Signature element that is placed in the header of a V2GMessage.
This process is divided into two steps:
1. Create the Reference element(s) that go into the SignedInfo element.
2. Compute the SignatureValue based on EXI encoding the SignedInfo element
and then applying ECDSA (Elliptic Curve Digital Signature Algorithm) to
it, encrypting with the private key provided.
(Check Annex J, section J.2 in ISO 15118-2, for a reference of how to
generate a signature)
Args:
elements_to_sign: A list of tuples [str, bytes], where the first entry
of each tuple is the Id field (XML attribute) and the
second entry is the EXI encoded bytes representation
of the element for which a Reference element in the
SignedInfo element of the V2GMessage header needs to
be created, as part of creating a digital signature.
signature_key: The private key used to encrypt the EXI encoded and
hashed SignedInfo element (using ECDSA), which represents
in the end the SignatureValue of the Signature element of
the V2GMessage header.
Returns:
A Signature instance, containing the SignedInfo and SignatureValue
elements that need to be placed in the header of a V2GMessage
TODO We need to determine between ISO 15118-2 and -20 signatures. Probably
need a 'protocol' parameter
"""
# 1. Step: Reference generation
reference_list: List[Reference] = []
for id_attr, exi_encoded in elements_to_sign:
reference = Reference(
uri="#" + id_attr,
transforms=Transforms(
transform=[Transform(algorithm="http://www.w3.org/TR/canonical-exi/")]
),
digest_method=DigestMethod(
algorithm="http://www.w3.org/2001/04/xmlenc#sha256"
),
digest_value=create_digest(exi_encoded),
)
reference_list.append(reference)
signed_info = SignedInfo(
canonicalization_method=CanonicalizationMethod(
algorithm="http://www.w3.org/TR/canonical-exi/"
),
signature_method=SignatureMethod(
algorithm="http://www.w3.org/2001/04/xmldsig-more#ecdsa-sha256"
),
reference=reference_list,
)
# 2. Step: Signature generation
exi_encoded_signed_info = EXI().to_exi(signed_info, Namespace.XML_DSIG)
der_encoded_signature_value = signature_key.sign(
data=exi_encoded_signed_info, signature_algorithm=ec.ECDSA(SHA256())
)
# The sign method from the cryptography library automatically DER encodes
# the signature. However, in ISO 15118 DER encoding of the signature
# is not expected. Thus, in the next lines we extract the r and s points
# from the DER encoding, which correspond to the coordinates of the signature
# value on the Elliptic Curve.
# Each of these coordinates have a 32 byte length number.
# The `decode_dss_signature` returns the r and s points as integer
# https://cryptography.io/en/latest/hazmat/primitives/asymmetric/utils/#cryptography.hazmat.primitives.asymmetric.utils.decode_dss_signature # noqa
(ec_r, ec_s) = decode_dss_signature(der_encoded_signature_value)
# As the signature value is sent as a full 64 bytes raw value, we need
# to convert each point to bytes in big endian format and concatenate both
raw_signature_value = bytearray(ec_r.to_bytes(32, "big") + ec_s.to_bytes(32, "big"))
signature = Signature(
signed_info=signed_info,
signature_value=SignatureValue(value=raw_signature_value),
)
return signature
def verify_signature(
signature: Signature,
elements_to_sign: List[Tuple[str, bytes]],
leaf_cert: bytes,
sub_ca_certs: List[bytes] = None,
root_ca_cert: bytes = None,
) -> bool:
"""
Verifies the signature contained in the Signature element of the V2GMessage
header. The following steps are required:
1. Iterate over all element IDs of the message which should have been signed
and find the respective Reference element in the SignedInfo element of
the message header. Calculate the message digest for each element and
compare with the received message digest in the SignedInfo's Reference
element. If the received and the calculated digests are equal, we can
continue with step 2.
2. Verify the signature by decrypting the signature value (using the public
key stored in the verify_cert parameter) and comparing its value with
the EXI encoded and hashed SignedInfo element. If the values match, then
the signature is verified with the public key certificate.
3. The final step is to verify that all signatures in the chain of
certificates from leaf to root are valid. In the case of the
AuthorizationReq message, for example, we can skip this step if the
contract certificate chain from leaf to root was already checked when
receiving the PaymentDetailsReq message (which contains the contract
certificate and sub-CA certificate(s)).
Args:
signature: The Signature instance containing the Reference elements and
the SignatureValue needed to verify the signature.
elements_to_sign: A list of tuples [int, bytes], where the first entry
of each tuple is the Id field (XML attribute) and the
second entry is the EXI encoded bytes representation
of the element for which a Reference element in the
SignedInfo element of the V2GMessage header exists.
leaf_cert: The certificate whose public key is used to verify the
signature, i.e. to decrypt the encrypted SignatureValue
element and check the result with the EXI encoded and
hashed SignedInfo element.
sub_ca_certs: The sub-CA certificate(s) belonging to the verify_leaf_cert
If provided, then the root_cert_path must also be provided.
root_ca_cert: Root CA certificate used to verify the signature of (one of)
the sub-CA certificate(s). If provided, then the
sub_ca_certs must also be provided.
Returns:
True, if the signature can be successfully verified, False otherwise.
"""
# 1. Step: Digest value check for each reference element
for id_attr, exi_encoded in elements_to_sign:
logger.debug(f"Verifying digest for element with ID '{id_attr}'")
calculated_digest = create_digest(exi_encoded)
message_digests_equal = False
for reference in signature.signed_info.reference:
if not reference.uri:
logger.error("Reference without URI element")
continue
if reference.uri == "#" + id_attr:
if calculated_digest == reference.digest_value:
message_digests_equal = True
logger.debug(
f"\nReceived digest of reference with ID {id_attr}: "
f"{reference.digest_value.hex().upper()}"
f"\nCalculated digest for reference: "
f"{calculated_digest.hex().upper()}"
f"\n=> Match: {message_digests_equal}"
)
if not message_digests_equal:
logger.error(f"Digest mismatch for element with ID '{id_attr}'")
return False
# 2. Step: Checking signature value
logger.debug("Verifying signature value for SignedInfo element")
pub_key = load_der_x509_certificate(leaf_cert).public_key()
# The signature value element corresponds to the encryption of the EXI encoded
# and then hashed signed_info element.
# Signed Info -> EXI encoding -> Hashing -> Encryption with private key => Signature Value # noqa: E501
# ATTENTION: The hashing and encryption operation is part of the
# ECDSA (Elliptic Curve Digital Signature Algorithm) operation.
# That is why we do NOT additionally hash the EXI encoded signed info element
# before we inject it to the `data` field of the `verify` method.
exi_encoded_signed_info = EXI().to_exi(signature.signed_info, Namespace.XML_DSIG)
# The verify method from cryptography expects the signature to be in DER encoded
# format. Please check: https://cryptography.io/en/latest/hazmat/primitives/asymmetric/ec/#cryptography.hazmat.primitives.asymmetric.ec.EllipticCurvePublicKey.verify # noqa: E501
# However, in ISO 15118 the signature value is exchanged in raw format.
# In order to convert the signature value to DER format, it is possible to use the
# encode_dss_signature from cryptography, but we need to provide the
# r and s values of the signature as ints.
# https://cryptography.io/en/latest/hazmat/primitives/asymmetric/utils/#cryptography.hazmat.primitives.asymmetric.utils.encode_dss_signature # noqa: E501
# The `r` and `s` values are both 32 bytes values that correspond to the
# coordinates in the Elliptic curve from where the public and private key
# are extracted
ec_r = int.from_bytes(signature.signature_value.value[:32], "big")
ec_s = int.from_bytes(signature.signature_value.value[32:], "big")
der_encoded_signature = encode_dss_signature(r=ec_r, s=ec_s)
try:
if isinstance(pub_key, EllipticCurvePublicKey):
pub_key.verify(
signature=der_encoded_signature,
data=exi_encoded_signed_info,
signature_algorithm=ec.ECDSA(SHA256()),
)
else:
# TODO Add support for ISO 15118-20 public key types
raise KeyTypeError(f"Unexpected public key type " f"{type(pub_key)}")
except InvalidSignature as e:
pub_key_bytes = pub_key.public_bytes(
encoding=Encoding.X962, format=PublicFormat.UncompressedPoint
)
logger.error(
f"Signature verification failed for signature value "
f"\n{signature.signature_value.value.hex().upper()} \n"
f"Pub Key from Leaf Certificate: {pub_key_bytes.hex().upper()}"
f"\n Error: {e} "
)
return False
# 3. Step: Verify signatures along the certificate chain, if provided
if sub_ca_certs and root_ca_cert:
try:
verify_certs(leaf_cert, sub_ca_certs, root_ca_cert)
except (
CertSignatureError,
CertNotYetValidError,
CertExpiredError,
CertRevokedError,
CertAttributeError,
CertChainLengthError,
) as exc:
logger.error(
f"{exc.__class__.__name__}: Signature verification "
f"failed while checking certificate chain"
)
return False
else:
logger.warning(
"Sub-CA and root CA certificates were not used to "
"verify signatures along the certificate chain"
)