-
Notifications
You must be signed in to change notification settings - Fork 96
/
pgp.py
2527 lines (2004 loc) · 96.6 KB
/
pgp.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
""" pgp.py
this is where the armorable PGP block objects live
"""
import binascii
import calendar
import collections
import contextlib
import copy
import functools
import itertools
import operator
import os
import re
import warnings
import weakref
import six
from datetime import datetime
from cryptography.hazmat.primitives import hashes
from .constants import CompressionAlgorithm
from .constants import Features
from .constants import HashAlgorithm
from .constants import ImageEncoding
from .constants import KeyFlags
from .constants import NotationDataFlags
from .constants import PacketTag
from .constants import PubKeyAlgorithm
from .constants import RevocationKeyClass
from .constants import RevocationReason
from .constants import SignatureType
from .constants import SymmetricKeyAlgorithm
from .decorators import KeyAction
from .errors import PGPDecryptionError
from .errors import PGPError
from .packet import Key
from .packet import MDC
from .packet import Packet
from .packet import Primary
from .packet import Private
from .packet import PubKeyV4
from .packet import PubSubKeyV4
from .packet import PrivKeyV4
from .packet import PrivSubKeyV4
from .packet import Public
from .packet import Sub
from .packet import UserID
from .packet import UserAttribute
from .packet.packets import CompressedData
from .packet.packets import IntegrityProtectedSKEData
from .packet.packets import IntegrityProtectedSKEDataV1
from .packet.packets import LiteralData
from .packet.packets import OnePassSignature
from .packet.packets import OnePassSignatureV3
from .packet.packets import PKESessionKey
from .packet.packets import PKESessionKeyV3
from .packet.packets import Signature
from .packet.packets import SignatureV4
from .packet.packets import SKEData
from .packet.packets import Marker
from .packet.packets import SKESessionKey
from .packet.packets import SKESessionKeyV4
from .packet.types import Opaque
from .types import Armorable
from .types import Fingerprint
from .types import ParentRef
from .types import PGPObject
from .types import SignatureVerification
from .types import SorteDeque
__all__ = ['PGPSignature',
'PGPUID',
'PGPMessage',
'PGPKey',
'PGPKeyring']
class PGPSignature(Armorable, ParentRef, PGPObject):
@property
def __sig__(self):
return self._signature.signature.__sig__()
@property
def cipherprefs(self):
"""
A ``list`` of preferred symmetric algorithms specified in this signature, if any. Otherwise, an empty ``list``.
"""
if 'PreferredSymmetricAlgorithms' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_PreferredSymmetricAlgorithms'])).flags
return []
@property
def compprefs(self):
"""
A ``list`` of preferred compression algorithms specified in this signature, if any. Otherwise, an empty ``list``.
"""
if 'PreferredCompressionAlgorithms' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_PreferredCompressionAlgorithms'])).flags
return []
@property
def created(self):
"""
A :py:obj:`~datetime.datetime` of when this signature was created.
"""
return self._signature.subpackets['h_CreationTime'][-1].created
@property
def embedded(self):
return self.parent is not None
@property
def expires_at(self):
"""
A :py:obj:`~datetime.datetime` of when this signature expires, if a signature expiration date is specified.
Otherwise, ``None``
"""
if 'SignatureExpirationTime' in self._signature.subpackets:
expd = next(iter(self._signature.subpackets['SignatureExpirationTime'])).expires
return self.created + expd
return None
@property
def exportable(self):
"""
``False`` if this signature is marked as being not exportable. Otherwise, ``True``.
"""
if 'ExportableCertification' in self._signature.subpackets:
return bool(next(iter(self._signature.subpackets['ExportableCertification'])))
return True
@property
def features(self):
"""
A ``set`` of implementation features specified in this signature, if any. Otherwise, an empty ``set``.
"""
if 'Features' in self._signature.subpackets:
return next(iter(self._signature.subpackets['Features'])).flags
return set()
@property
def hash2(self):
return self._signature.hash2
@property
def hashprefs(self):
"""
A ``list`` of preferred hash algorithms specified in this signature, if any. Otherwise, an empty ``list``.
"""
if 'PreferredHashAlgorithms' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_PreferredHashAlgorithms'])).flags
return []
@property
def hash_algorithm(self):
"""
The :py:obj:`~constants.HashAlgorithm` used when computing this signature.
"""
return self._signature.halg
@property
def is_expired(self):
"""
``True`` if the signature has an expiration date, and is expired. Otherwise, ``False``
"""
expires_at = self.expires_at
if expires_at is not None and expires_at != self.created:
return expires_at < datetime.utcnow()
return False
@property
def key_algorithm(self):
"""
The :py:obj:`~constants.PubKeyAlgorithm` of the key that generated this signature.
"""
return self._signature.pubalg
@property
def key_expiration(self):
if 'KeyExpirationTime' in self._signature.subpackets:
return next(iter(self._signature.subpackets['KeyExpirationTime'])).expires
return None
@property
def key_flags(self):
"""
A ``set`` of :py:obj:`~constants.KeyFlags` specified in this signature, if any. Otherwise, an empty ``set``.
"""
if 'KeyFlags' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_KeyFlags'])).flags
return set()
@property
def keyserver(self):
"""
The preferred key server specified in this signature, if any. Otherwise, an empty ``str``.
"""
if 'PreferredKeyServer' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_PreferredKeyServer'])).uri
return ''
@property
def keyserverprefs(self):
"""
A ``list`` of :py:obj:`~constants.KeyServerPreferences` in this signature, if any. Otherwise, an empty ``list``.
"""
if 'KeyServerPreferences' in self._signature.subpackets:
return next(iter(self._signature.subpackets['h_KeyServerPreferences'])).flags
return []
@property
def magic(self):
return "SIGNATURE"
@property
def notation(self):
"""
A ``dict`` of notation data in this signature, if any. Otherwise, an empty ``dict``.
"""
return dict((nd.name, nd.value) for nd in self._signature.subpackets['NotationData'])
@property
def policy_uri(self):
"""
The policy URI specified in this signature, if any. Otherwise, an empty ``str``.
"""
if 'Policy' in self._signature.subpackets:
return next(iter(self._signature.subpackets['Policy'])).uri
return ''
@property
def revocable(self):
"""
``False`` if this signature is marked as being not revocable. Otherwise, ``True``.
"""
if 'Revocable' in self._signature.subpackets:
return bool(next(iter(self._signature.subpackets['Revocable'])))
return True
@property
def revocation_key(self):
if 'RevocationKey' in self._signature.subpackets:
raise NotImplementedError()
return None
@property
def signer(self):
"""
The 16-character Key ID of the key that generated this signature.
"""
return self._signature.signer
@property
def target_signature(self):
return NotImplemented
@property
def type(self):
"""
The :py:obj:`~constants.SignatureType` of this signature.
"""
return self._signature.sigtype
@classmethod
def new(cls, sigtype, pkalg, halg, signer):
sig = PGPSignature()
sigpkt = SignatureV4()
sigpkt.header.tag = 2
sigpkt.header.version = 4
sigpkt.subpackets.addnew('CreationTime', hashed=True, created=datetime.utcnow())
sigpkt.subpackets.addnew('Issuer', _issuer=signer)
sigpkt.sigtype = sigtype
sigpkt.pubalg = pkalg
if halg is not None:
sigpkt.halg = halg
sig._signature = sigpkt
return sig
def __init__(self):
"""
PGPSignature objects represent OpenPGP compliant signatures.
PGPSignature implements the ``__str__`` method, the output of which will be the signature object in
OpenPGP-compliant ASCII-armored format.
PGPSignature implements the ``__bytes__`` method, the output of which will be the signature object in
OpenPGP-compliant binary format.
"""
super(PGPSignature, self).__init__()
self._signature = None
def __bytearray__(self):
return self._signature.__bytearray__()
def __repr__(self):
return "<PGPSignature [{:s}] object at 0x{:02x}>".format(self.type.name, id(self))
def __lt__(self, other):
return self.created < other.created
def __or__(self, other):
if isinstance(other, Signature):
if self._signature is None:
self._signature = other
return self
##TODO: this is not a great way to do this
if other.__class__.__name__ == 'EmbeddedSignature':
self._signature = other
return self
raise TypeError
def __copy__(self):
# because the default shallow copy isn't actually all that useful,
# and deepcopy does too much work
sig = super(PGPSignature, self).__copy__()
# sig = PGPSignature()
# sig.ascii_headers = self.ascii_headers.copy()
sig |= copy.copy(self._signature)
return sig
def hashdata(self, subject):
_data = bytearray()
if isinstance(subject, six.string_types):
subject = subject.encode('latin-1')
"""
All signatures are formed by producing a hash over the signature
data, and then using the resulting hash in the signature algorithm.
"""
if self.type == SignatureType.BinaryDocument:
"""
For binary document signatures (type 0x00), the document data is
hashed directly.
"""
_data += bytearray(subject)
if self.type == SignatureType.CanonicalDocument:
"""
For text document signatures (type 0x01), the
document is canonicalized by converting line endings to <CR><LF>,
and the resulting data is hashed.
"""
_data += re.subn(br'\r?\n', b'\r\n', subject)[0]
if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert,
SignatureType.Positive_Cert, SignatureType.CertRevocation, SignatureType.Subkey_Binding,
SignatureType.PrimaryKey_Binding}:
"""
When a signature is made over a key, the hash data starts with the
octet 0x99, followed by a two-octet length of the key, and then body
of the key packet. (Note that this is an old-style packet header for
a key packet with two-octet length.) ...
Key revocation signatures (types 0x20 and 0x28)
hash only the key being revoked.
"""
_s = b''
if isinstance(subject, PGPUID):
_s = subject._parent.hashdata
elif isinstance(subject, PGPKey) and not subject.is_primary:
_s = subject._parent.hashdata
elif isinstance(subject, PGPKey) and subject.is_primary:
_s = subject.hashdata
if len(_s) > 0:
_data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
if self.type in {SignatureType.Subkey_Binding, SignatureType.PrimaryKey_Binding}:
"""
A subkey binding signature
(type 0x18) or primary key binding signature (type 0x19) then hashes
the subkey using the same format as the main key (also using 0x99 as
the first octet).
"""
if subject.is_primary:
_s = subject.subkeys[self.signer].hashdata
else:
_s = subject.hashdata
_data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
if self.type in {SignatureType.KeyRevocation, SignatureType.SubkeyRevocation, SignatureType.DirectlyOnKey}:
"""
The signature is calculated directly on the key being revoked. A
revoked key is not to be used. Only revocation signatures by the
key being revoked, or by an authorized revocation key, should be
considered valid revocation signatures.
Subkey revocation signature
The signature is calculated directly on the subkey being revoked.
A revoked subkey is not to be used. Only revocation signatures
by the top-level signature key that is bound to this subkey, or
by an authorized revocation key, should be considered valid
revocation signatures.
Signature directly on a key
This signature is calculated directly on a key. It binds the
information in the Signature subpackets to the key, and is
appropriate to be used for subpackets that provide information
about the key, such as the Revocation Key subpacket. It is also
appropriate for statements that non-self certifiers want to make
about the key itself, rather than the binding between a key and a
name.
"""
_s = subject.hashdata
_data += b'\x99' + self.int_to_bytes(len(_s), 2) + _s
if self.type in {SignatureType.Generic_Cert, SignatureType.Persona_Cert, SignatureType.Casual_Cert,
SignatureType.Positive_Cert, SignatureType.CertRevocation}:
"""
A certification signature (type 0x10 through 0x13) hashes the User
ID being bound to the key into the hash context after the above
data. ... A V4 certification
hashes the constant 0xB4 for User ID certifications or the constant
0xD1 for User Attribute certifications, followed by a four-octet
number giving the length of the User ID or User Attribute data, and
then the User ID or User Attribute data.
...
The [certificate revocation] signature
is computed over the same data as the certificate that it
revokes, and should have a later creation date than that
certificate.
"""
_s = subject.hashdata
if subject.is_uid:
_data += b'\xb4'
else:
_data += b'\xd1'
_data += self.int_to_bytes(len(_s), 4) + _s
# if this is a new signature, do update_hlen
if 0 in list(self._signature.signature):
self._signature.update_hlen()
"""
Once the data body is hashed, then a trailer is hashed. (...)
A V4 signature hashes the packet body
starting from its first field, the version number, through the end
of the hashed subpacket data. Thus, the fields hashed are the
signature version, the signature type, the public-key algorithm, the
hash algorithm, the hashed subpacket length, and the hashed
subpacket body.
V4 signatures also hash in a final trailer of six octets: the
version of the Signature packet, i.e., 0x04; 0xFF; and a four-octet,
big-endian number that is the length of the hashed data from the
Signature packet (note that this number does not include these final
six octets).
"""
hcontext = bytearray()
hcontext.append(self._signature.header.version if not self.embedded else self._signature._sig.header.version)
hcontext.append(self.type)
hcontext.append(self.key_algorithm)
hcontext.append(self.hash_algorithm)
hcontext += self._signature.subpackets.__hashbytearray__()
hlen = len(hcontext)
_data += hcontext
_data += b'\x04\xff'
_data += self.int_to_bytes(hlen, 4)
return bytes(_data)
def make_onepass(self):
onepass = OnePassSignatureV3()
onepass.sigtype = self.type
onepass.halg = self.hash_algorithm
onepass.pubalg = self.key_algorithm
onepass.signer = self.signer
onepass.update_hlen()
return onepass
def parse(self, packet):
unarmored = self.ascii_unarmor(packet)
data = unarmored['body']
if unarmored['magic'] is not None and unarmored['magic'] != 'SIGNATURE':
raise ValueError('Expected: SIGNATURE. Got: {}'.format(str(unarmored['magic'])))
if unarmored['headers'] is not None:
self.ascii_headers = unarmored['headers']
# load *one* packet from data
pkt = Packet(data)
if pkt.header.tag == PacketTag.Signature and not isinstance(pkt, Opaque):
self._signature = pkt
else:
raise ValueError('Expected: Signature. Got: {:s}'.format(pkt.__class__.__name__))
class PGPUID(ParentRef):
@property
def __sig__(self):
return list(self._signatures)
@property
def name(self):
"""If this is a User ID, the stored name. If this is not a User ID, this will be an empty string."""
return self._uid.name if isinstance(self._uid, UserID) else ""
@property
def comment(self):
"""
If this is a User ID, this will be the stored comment. If this is not a User ID, or there is no stored comment,
this will be an empty string.,
"""
return self._uid.comment if isinstance(self._uid, UserID) else ""
@property
def email(self):
"""
If this is a User ID, this will be the stored email address. If this is not a User ID, or there is no stored
email address, this will be an empty string.
"""
return self._uid.email if isinstance(self._uid, UserID) else ""
@property
def image(self):
"""
If this is a User Attribute, this will be the stored image. If this is not a User Attribute, this will be ``None``.
"""
return self._uid.image.image if isinstance(self._uid, UserAttribute) else None
@property
def is_primary(self):
"""
If the most recent, valid self-signature specifies this as being primary, this will be True. Otherwise, Faqlse.
"""
return bool(next(iter(self.selfsig._signature.subpackets['h_PrimaryUserID']), False))
@property
def is_uid(self):
"""
``True`` if this is a User ID, otherwise False.
"""
return isinstance(self._uid, UserID)
@property
def is_ua(self):
"""
``True`` if this is a User Attribute, otherwise False.
"""
return isinstance(self._uid, UserAttribute)
@property
def selfsig(self):
"""
This will be the most recent, self-signature of this User ID or Attribute. If there isn't one, this will be ``None``.
"""
if self.parent is not None:
return next((sig for sig in reversed(self._signatures) if sig.signer == self.parent.fingerprint.keyid), None)
@property
def signers(self):
"""
This will be a set of all of the key ids which have signed this User ID or Attribute.
"""
return set(s.signer for s in self.__sig__)
@property
def hashdata(self):
if self.is_uid:
return self._uid.__bytearray__()[len(self._uid.header):]
if self.is_ua:
return self._uid.subpackets.__bytearray__()
@classmethod
def new(cls, pn, comment="", email=""):
"""
Create a new User ID or photo.
:param pn: User ID name, or photo. If this is a ``bytearray``, it will be loaded as a photo.
Otherwise, it will be used as the name field for a User ID.
:type pn: ``bytearray``, ``str``, ``unicode``
:param comment: The comment field for a User ID. Ignored if this is a photo.
:type comment: ``str``, ``unicode``
:param email: The email address field for a User ID. Ignored if this is a photo.
:type email: ``str``, ``unicode``
:returns: :py:obj:`PGPUID`
"""
uid = PGPUID()
if isinstance(pn, bytearray):
uid._uid = UserAttribute()
uid._uid.image.image = pn
uid._uid.image.iencoding = ImageEncoding.encodingof(pn)
uid._uid.update_hlen()
else:
uid._uid = UserID()
uid._uid.name = pn
uid._uid.comment = comment
uid._uid.email = email
uid._uid.update_hlen()
return uid
def __init__(self):
"""
PGPUID objects represent User IDs and User Attributes for keys.
PGPUID implements the ``__format__`` method for User IDs, returning a string in the format
'name (comment) <email>', leaving out any comment or email fields that are not present.
"""
super(PGPUID, self).__init__()
self._uid = None
self._signatures = SorteDeque()
def __repr__(self):
if self.selfsig is not None:
return "<PGPUID [{:s}][{}] at 0x{:02X}>".format(self._uid.__class__.__name__, self.selfsig.created, id(self))
return "<PGPUID [{:s}] at 0x{:02X}>".format(self._uid.__class__.__name__, id(self))
def __lt__(self, other): # pragma: no cover
if self.is_uid == other.is_uid:
if self.is_primary == other.is_primary:
return self.selfsig > other.selfsig
if self.is_primary:
return True
return False
if self.is_uid and other.is_ua:
return True
if self.is_ua and other.is_uid:
return False
def __or__(self, other):
if isinstance(other, PGPSignature):
self._signatures.insort(other)
if self.parent is not None and self in self.parent._uids:
self.parent._uids.resort(self)
return self
if isinstance(other, UserID) and self._uid is None:
self._uid = other
return self
if isinstance(other, UserAttribute) and self._uid is None:
self._uid = other
return self
raise TypeError("unsupported operand type(s) for |: '{:s}' and '{:s}'"
"".format(self.__class__.__name__, other.__class__.__name__))
def __copy__(self):
# because the default shallow copy isn't actually all that useful,
# and deepcopy does too much work
uid = PGPUID()
uid |= copy.copy(self._uid)
for sig in self._signatures:
uid |= copy.copy(sig)
return uid
def __format__(self, format_spec):
if self.is_uid:
comment = six.u("") if self.comment == "" else six.u(" ({:s})").format(self.comment)
email = six.u("") if self.email == "" else six.u(" <{:s}>").format(self.email)
return six.u("{:s}{:s}{:s}").format(self.name, comment, email)
raise NotImplementedError
class PGPMessage(Armorable, PGPObject):
@staticmethod
def dash_unescape(text):
return re.subn(r'^- -', '-', text, flags=re.MULTILINE)[0]
@staticmethod
def dash_escape(text):
return re.subn(r'^-', '- -', text, flags=re.MULTILINE)[0]
@property
def encrypters(self):
"""A ``set`` containing all key ids (if any) to which this message was encrypted."""
return set(m.encrypter for m in self._sessionkeys if isinstance(m, PKESessionKey))
@property
def filename(self):
"""If applicable, returns the original filename of the message. Otherwise, returns an empty string."""
if self.type == 'literal':
return self._message.filename
return ''
@property
def is_compressed(self):
"""``True`` if this message will be compressed when exported"""
return self._compression != CompressionAlgorithm.Uncompressed
@property
def is_encrypted(self):
"""``True`` if this message is encrypted; otherwise, ``False``"""
return isinstance(self._message, (SKEData, IntegrityProtectedSKEData))
@property
def is_sensitive(self):
"""``True`` if this message is marked sensitive; otherwise ``False``"""
return self.type == 'literal' and self._message.filename == '_CONSOLE'
@property
def is_signed(self):
"""
``True`` if this message is signed; otherwise, ``False``.
Should always be ``False`` if the message is encrypted.
"""
return len(self._signatures) > 0
@property
def issuers(self):
"""A ``set`` containing all key ids (if any) which have signed or encrypted this message."""
return self.encrypters | self.signers
@property
def magic(self):
if self.type == 'cleartext':
return "SIGNATURE"
return "MESSAGE"
@property
def message(self):
"""The message contents"""
if self.type == 'cleartext':
return self.bytes_to_text(self._message)
if self.type == 'literal':
return self._message.contents
if self.type == 'encrypted':
return self._message
@property
def signatures(self):
"""A ``set`` containing all key ids (if any) which have signed this message."""
return list(self._signatures)
@property
def signers(self):
"""A ``set`` containing all key ids (if any) which have signed this message."""
return set(m.signer for m in self._signatures)
@property
def type(self):
##TODO: it might be better to use an Enum for the output of this
if isinstance(self._message, (six.string_types, six.binary_type, bytearray)):
return 'cleartext'
if isinstance(self._message, LiteralData):
return 'literal'
if isinstance(self._message, (SKEData, IntegrityProtectedSKEData)):
return 'encrypted'
raise NotImplementedError
def __init__(self):
"""
PGPMessage objects represent OpenPGP message compositions.
PGPMessage implements the `__str__` method, the output of which will be the message composition in
OpenPGP-compliant ASCII-armored format.
PGPMessage implements the `__bytes__` method, the output of which will be the message composition in
OpenPGP-compliant binary format.
Any signatures within the PGPMessage that are marked as being non-exportable will not be included in the output
of either of those methods.
"""
super(PGPMessage, self).__init__()
self._compression = CompressionAlgorithm.Uncompressed
self._message = None
self._mdc = None
self._signatures = SorteDeque()
self._sessionkeys = []
def __bytearray__(self):
if self.is_compressed:
comp = CompressedData()
comp.calg = self._compression
comp.packets = [pkt for pkt in self]
comp.update_hlen()
return comp.__bytearray__()
_bytes = bytearray()
for pkt in self:
_bytes += pkt.__bytearray__()
return _bytes
def __str__(self):
if self.type == 'cleartext':
tmpl = u"-----BEGIN PGP SIGNED MESSAGE-----\n" \
u"{hhdr:s}\n" \
u"{cleartext:s}\n" \
u"{signature:s}"
# only add a Hash: header if we actually have at least one signature
hashes = set(s.hash_algorithm.name for s in self.signatures)
hhdr = 'Hash: {hashes:s}\n'.format(hashes=','.join(sorted(hashes))) if hashes else ''
return tmpl.format(hhdr=hhdr,
cleartext=self.dash_escape(self.bytes_to_text(self._message)),
signature=super(PGPMessage, self).__str__())
return super(PGPMessage, self).__str__()
def __iter__(self):
if self.type == 'cleartext':
for sig in self._signatures:
yield sig
elif self.is_encrypted:
for pkt in self._sessionkeys:
yield pkt
yield self.message
else:
##TODO: is it worth coming up with a way of disabling one-pass signing?
for sig in self._signatures:
ops = sig.make_onepass()
if sig is not self._signatures[-1]:
ops.nested = True
yield ops
yield self._message
if self._mdc is not None: # pragma: no cover
yield self._mdc
for sig in self._signatures:
yield sig
def __or__(self, other):
if isinstance(other, Marker):
return self
if isinstance(other, CompressedData):
self._compression = other.calg
for pkt in other.packets:
self |= pkt
return self
if isinstance(other, (six.string_types, six.binary_type, bytearray)):
if self._message is None:
self._message = self.text_to_bytes(other)
return self
if isinstance(other, (LiteralData, SKEData, IntegrityProtectedSKEData)):
if self._message is None:
self._message = other
return self
if isinstance(other, MDC):
if self._mdc is None:
self._mdc = other
return self
if isinstance(other, OnePassSignature):
# these are "generated" on the fly during composition
return self
if isinstance(other, Signature):
other = PGPSignature() | other
if isinstance(other, PGPSignature):
self._signatures.insort(other)
return self
if isinstance(other, (PKESessionKey, SKESessionKey)):
self._sessionkeys.append(other)
return self
if isinstance(other, PGPMessage):
self._message = other._message
self._mdc = other._mdc
self._compression = other._compression
self._sessionkeys += other._sessionkeys
self._signatures += other._signatures
return self
raise NotImplementedError(str(type(other)))
def __copy__(self):
msg = super(PGPMessage, self).__copy__()
msg._compression = self._compression
msg._message = copy.copy(self._message)
msg._mdc = copy.copy(self._mdc)
for sig in self._signatures:
msg |= copy.copy(sig)
for sk in self._sessionkeys:
msg |= copy.copy(sk)
return msg
@classmethod
def new(cls, message, **kwargs):
"""
Create a new PGPMessage object.
:param message: The message to be stored.
:type message: ``str``, ``unicode``, ``bytes``, ``bytearray``
:returns: :py:obj:`PGPMessage`
The following optional keyword arguments can be used with :py:meth:`PGPMessage.new`:
:keyword file: if True, ``message`` should be a path to a file. The contents of that file will be read and used
as the contents of the message.
:type file: ``bool``
:keyword cleartext: if True, the message will be cleartext with inline signatures.
:type cleartext: ``bool``
:keyword sensitive: if True, the filename will be set to '_CONSOLE' to signal other OpenPGP clients to treat
this message as being 'for your eyes only'. Ignored if cleartext is True.
:type sensitive: ``bool``
:keyword format: Set the message format identifier. Ignored if cleartext is True.
:type format: ``str``
:keyword compression: Set the compression algorithm for the new message.
Defaults to :py:obj:`CompressionAlgorithm.ZIP`. Ignored if cleartext is True.
:keyword encoding: Set the Charset header for the message.
:type encoding: ``str`` representing a valid codec in codecs
"""
# TODO: have 'codecs' above (in :type encoding:) link to python documentation page on codecs
cleartext = kwargs.pop('cleartext', False)
format = kwargs.pop('format', None)
sensitive = kwargs.pop('sensitive', False)
compression = kwargs.pop('compression', CompressionAlgorithm.ZIP)
file = kwargs.pop('file', False)
charset = kwargs.pop('encoding', None)
filename = ''
mtime = datetime.utcnow()
msg = PGPMessage()
if charset:
msg.charset = charset
# if format in 'tu' and isinstance(message, (six.binary_type, bytearray)):
# # if message format is text or unicode and we got binary data, we'll need to transcode it to UTF-8
# message =
if file and os.path.isfile(message):
filename = message
message = bytearray(os.path.getsize(filename))
mtime = datetime.utcfromtimestamp(os.path.getmtime(filename))
with open(filename, 'rb') as mf:
mf.readinto(message)
# if format is None, we can try to detect it
if format is None:
if isinstance(message, six.text_type):
# message is definitely UTF-8 already
format = 'u'
elif cls.is_ascii(message):
# message is probably text
format = 't'
else:
# message is probably binary
format = 'b'
# if message is a binary type and we're building a textual message, we need to transcode the bytes to UTF-8
if isinstance(message, (six.binary_type, bytearray)) and (cleartext or format in 'tu'):
message = message.decode(charset or 'utf-8')
if cleartext:
msg |= message
else:
# load literal data