/
pkcs7.py
147 lines (128 loc) · 5.33 KB
/
pkcs7.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import json
from contextlib import suppress
from datetime import datetime
from refinery.units import Unit
from refinery.lib.json import BytesAsArrayEncoder
class pkcs7(Unit):
"""
Converts PKCS7 encoded data to a JSON representation.
"""
@Unit.Requires('asn1crypto', 'default', 'extended')
def _asn1crypto():
import asn1crypto
import asn1crypto.cms
import asn1crypto.core
import asn1crypto.x509
return asn1crypto
def process(self, data: bytes):
asn1 = self._asn1crypto.core
cms = self._asn1crypto.cms
signature = cms.ContentInfo.load(data)
def unsign(data):
if isinstance(data, int):
size = data.bit_length()
if data < 0:
data = (1 << (size + 1)) - ~data - 1
if data > 0xFFFFFFFF_FFFFFFFF:
size, r = divmod(size, 8)
size += bool(r)
data = data.to_bytes(size, 'big').hex()
return data
elif isinstance(data, dict):
return {key: unsign(value) for key, value in data.items()}
elif isinstance(data, list):
return [unsign(x) for x in data]
else:
return data
class SpcString(asn1.Choice):
_alternatives = [
('unicode', asn1.BMPString, {'implicit': 0}),
('ascii', asn1.IA5String, {'implicit': 1})
]
SpcUuid = asn1.OctetString
class SpcSerializedObject(asn1.Sequence):
_fields = [
('classId', SpcUuid),
('serializedData', asn1.OctetString),
]
class SpcLink(asn1.Choice):
_alternatives = [
('url', asn1.IA5String, {'implicit': 0}),
('monikier', SpcSerializedObject, {'implicit': 1}),
('file', SpcString, {'explicit': 2})
]
class SpcSpOpusInfo(asn1.Sequence):
_fields = [
('programName', SpcString, {'optional': True, 'explicit': 0}),
('moreInfo', SpcLink, {'optional': True, 'explicit': 1}),
]
class SetOfInfos(asn1.SetOf):
_child_spec = SpcSpOpusInfo
cms.CMSAttributeType._map['1.3.6.1.4.1.311.2.1.12'] = 'authenticode_info'
cms.CMSAttribute._oid_specs['authenticode_info'] = SetOfInfos
class ParsedASN1ToJSON(BytesAsArrayEncoder):
unit = self
@classmethod
def _is_keyval(cls, obj):
return (
isinstance(obj, dict)
and set(obj.keys()) == {'type', 'values'}
and len(obj['values']) == 1
)
@classmethod
def handled(cls, obj) -> bool:
return BytesAsArrayEncoder.handled(obj) or cls._is_keyval(obj)
def encode_bytes(self, obj: bytes):
with suppress(Exception):
string = obj.decode('latin1')
if string.isprintable():
return string
return super().encode_bytes(obj)
def default(self, obj):
if self._is_keyval(obj):
return dict(type=obj['type'], value=obj['values'][0])
with suppress(TypeError):
return super().default(obj)
if isinstance(obj, (set, tuple)):
return list(obj)
if isinstance(obj, datetime):
return str(obj)
dict_result = {}
list_result = None
if isinstance(obj, self.unit._asn1crypto.x509.Certificate):
dict_result.update(fingerprint=obj.sha1.hex())
if isinstance(obj, asn1.BitString):
return {'bit_string': obj.native}
with suppress(Exception):
list_result = list(obj)
if all(isinstance(k, str) for k in list_result):
dict_result.update((key, obj[key]) for key in list_result)
if dict_result:
return dict_result
if list_result is not None:
return list_result
if isinstance(obj, self.unit._asn1crypto.cms.CertificateChoices):
return obj.chosen
if isinstance(obj, asn1.Sequence):
children = obj.children
if children:
return children
return obj.dump()
with suppress(Exception):
return obj.native
if isinstance(obj, asn1.Any):
parsed = None
with suppress(Exception):
parsed = obj.parse()
if parsed:
return parsed
return obj.dump()
if isinstance(obj, asn1.Asn1Value):
return obj.dump()
raise ValueError(F'Unable to determine JSON encoding of {obj.__class__.__name__} object.')
with ParsedASN1ToJSON as encoder:
encoded = encoder.dumps(signature)
converted = unsign(json.loads(encoded))
return json.dumps(converted, indent=4).encode(self.codec)