-
Notifications
You must be signed in to change notification settings - Fork 61
/
test_emlparser.py
224 lines (176 loc) · 12.7 KB
/
test_emlparser.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
# -*- coding: utf-8 -*-
# pylint: disable=line-too-long
import datetime
import email.policy
import email.utils
import json
import os.path
import typing
from email.headerregistry import Address
from email.message import EmailMessage
import pytest
import eml_parser.eml_parser
my_execution_dir = os.path.dirname(os.path.realpath(__file__))
parent_dir = os.path.split(my_execution_dir)[0]
samples_dir = os.path.join(parent_dir, 'samples')
def deep_flatten_object(obj: typing.Any) -> dict:
"""The output generated by eml_parser is a nested structure of a mix of dicts and lists.
A simple comparison will not work here, thus what we use this function for is to convert
the path to a value, through the structure, to a string.
Then we fill a new dictionary with the path as key and value as a list of values (as there
can be more than one value per key).
Args:
obj (object): Any of dict, list, set, tuple
Returns:
dict: Returns a dict with the result.
"""
def sub(obj: typing.Any, res: list) -> typing.Iterator[typing.Tuple[str, typing.Any]]:
if type(obj) == dict:
for k, v in obj.items():
yield from sub(v, res + [k])
elif type(obj) == list:
for v in obj:
yield from sub(v, res)
elif obj is None:
yield ("_".join(res), '')
else:
yield ("_".join(res), obj)
flat_kv = {} # type: typing.Dict[str, typing.List[str]]
for k, v in sub(obj, []):
if k not in flat_kv:
flat_kv[k] = [v]
else:
flat_kv[k].append(v)
return flat_kv
def recursive_compare(element_a: typing.Dict[str, str], element_b: typing.Dict[str, str]) -> None:
"""This function flattens both input elements and compares them recursively.
Args:
element_a (dict): Input element a.
element_b (dict): Input element b.
"""
element_a_flat = deep_flatten_object(element_a)
element_b_flat = deep_flatten_object(element_b)
for k in sorted(element_a_flat):
assert k in element_b_flat
for v in element_a_flat[k]:
assert v in element_b_flat[k]
def json_serial(obj: typing.Any) -> typing.Optional[str]:
"""JSON serializer for objects not serializable by default json code"""
if isinstance(obj, datetime.datetime):
serial = obj.isoformat()
return serial
return None
class TestEMLParser:
def test_get_file_hash(self):
with open(os.path.join(samples_dir, 'sample.eml'), 'rb') as fhdl:
raw_email = fhdl.read()
pre_computed_hashes = {'sha256': '99798841db2f773a11ead628526ab4d6226187e20ca715e3439bb7375806b275',
'md5': '2c5e3f62e6d2b1511a0f5e7476bca46a',
'sha512': '3a3d78e6cb8a5e0740fbfdf36083d9da950a60843bb240990ab30fa4062e608a17770a582de3d13b5240727531cfb98a826fbcc6aadd371f541acabb7c9f98e7',
'sha1': 'effbc0f4702f8d8d1d4911a6f0228013919c2cdc'
}
assert eml_parser.eml_parser.get_file_hash(raw_email) == pre_computed_hashes
def test_wrap_hash_sha256(self):
assert eml_parser.eml_parser.wrap_hash_sha256(
'www.example.com') == '80fc0fb9266db7b83f85850fa0e6548b6d70ee68c8b5b412f1deea6ebdef0404'
def test_get_uri_ondata(self):
test_urls = '''Lorem ipsum dolor sit amet, consectetur adipiscing elit.
Mauris consectetur mi tortor, http://www.example.com consectetur iaculis orci ultricies sit amet.
Mauris "http://www.example.com/test1?bla" ornare lobortis ex nec dictum. Aliquam blandit arcu ac lorem iaculis aliquet.
Praesent a tempus dui, eu feugiat diam. Interdum http://www.example.com/a/b/c/d/ et malesuada fames ac ante ipsum primis in faucibus.
Suspendisse ac rutrum leo, non vehicula purus. Quisque quis sapien lorem. Nunc velit enim,
placerat quis vestibulum at, hxxps://www.example2.com condimentum non velit.'''
expected_result = ['http://www.example.com', 'http://www.example.com/test1?bla',
'http://www.example.com/a/b/c/d/', 'https://www.example2.com']
assert eml_parser.eml_parser.get_uri_ondata(test_urls) == expected_result
def test_headeremail2list_1(self):
msg = EmailMessage()
msg['Subject'] = 'Test subject éèàöüä${}'
msg['From'] = Address("John Doe", "john.doe", "example.com")
msg['To'] = (Address("Jané Doe", "jane.doe", "example.com"),
Address("James Doe", "james.doe", "example.com"))
msg.set_content('''Hi,
Lorem ipsüm dolor sit amét, consectetur 10$ + 5€ adipiscing elit. Praesent feugiat vitae tellus et molestie. Duis est ipsum, tristique eu pulvinar vel, aliquet a nibh. Vestibulum ultricies semper euismod. Maecenas non sagittis elit. Mauris non feugiat leo. Cras vitae quam est. Donec dapibus justo ut dictum viverra. Aliquam eleifend tortor mollis, vulputate ante sit amet, sodales elit. Fusce scelerisque congue risus mollis pellentesque. Sed malesuada erat sit amet nisl laoreet mollis. Suspendisse potenti. Fusce cursus, tortor sit amet euismod molestie, sem enim semper quam, eu ultricies leo est vel turpis.
''')
assert sorted(eml_parser.eml_parser.headeremail2list(mail=msg, header='to')) == ['james.doe@example.com',
'jane.doe@example.com']
def test_headeremail2list_2(self):
"""Here we test the headeremail2list function using an input which should trigger
a email library bug 27257
"""
with open(os.path.join(samples_dir, 'sample_bug27257.eml'), 'rb') as fhdl:
raw_email = fhdl.read()
msg = email.message_from_bytes(raw_email, policy=email.policy.default)
# just to be sure we still hit bug 27257 (else there is no more need for the workaround)
with pytest.raises(AttributeError):
msg.items()
# our parsing function should trigger an exception leading to the parsing
# using a workaround
assert eml_parser.eml_parser.headeremail2list(mail=msg, header='to') == ['test@example.com']
def test_parse_email_1(self):
"""Parses a generated sample e-mail and tests it against a known good result"""
msg = EmailMessage()
msg['Subject'] = 'Test subject éèàöüä${}'
msg['From'] = Address("John Doe", "john.doe", "example.com")
msg['To'] = (Address("Jané Doe", "jane.doe", "example.com"),
Address("James Doe", "james.doe", "example.com"))
msg.set_content('''Hi,
Lorem ipsüm dolor sit amét, consectetur 10$ + 5€ adipiscing elit. Praesent feugiat vitae tellus et molestie. Duis est ipsum, tristique eu pulvinar vel, aliquet a nibh. Vestibulum ultricies semper euismod. Maecenas non sagittis elit. Mauris non feugiat leo. Cras vitae quam est. Donec dapibus justo ut dictum viverra. Aliquam eleifend tortor mollis, vulputate ante sit amet, sodales elit. Fusce scelerisque congue risus mollis pellentesque. Sed malesuada erat sit amet nisl laoreet mollis. Suspendisse potenti. Fusce cursus, tortor sit amet euismod molestie, sem enim semper quam, eu ultricies leo est vel turpis.
''')
good_output_json = r'''{"header": {"header": {"content-transfer-encoding": ["quoted-printable"], "content-type": ["text/plain; charset=\"utf-8\""], "from": ["John Doe <john.doe@example.com>"], "subject": ["Test subject \u00e9\u00e8\u00e0\u00f6\u00fc\u00e4${}"], "to": ["Jan\u00e9 Doe <jane.doe@example.com>, James Doe <james.doe@example.com>"], "mime-version": ["1.0"]}, "from": "john.doe@example.com", "subject": "Test subject \u00e9\u00e8\u00e0\u00f6\u00fc\u00e4${}", "received": [], "date": "1970-01-01T00:00:00+00:00", "to": ["jane.doe@example.com", "james.doe@example.com"]}, "body": [{"content_header": {"content-transfer-encoding": ["quoted-printable"], "content-type": ["text/plain; charset=\"utf-8\""]}, "hash": "f765993eba20df87927f5bf6e947696d48bdf936e75508b9d126bbe8aa1a1497", "content_type": "text/plain"}]}'''
good_output = json.loads(good_output_json)
test_output_json = json.dumps(eml_parser.eml_parser.parse_email(msg), default=json_serial)
test_output = json.loads(test_output_json)
recursive_compare(good_output, test_output)
def test_parse_email_2(self):
"""Parses the e-mails from the samples folder"""
for k in os.listdir(samples_dir):
test = eml_parser.eml_parser.decode_email(os.path.join(samples_dir, k))
for k in os.listdir(samples_dir):
with open(os.path.join(samples_dir, k), 'rb') as fhdl:
raw_email = fhdl.read()
test = eml_parser.eml_parser.decode_email_b(raw_email)
def test_parse_email_3(self):
"""Parses the e-mails from the samples folder while keeping raw data"""
for k in os.listdir(samples_dir):
test = eml_parser.eml_parser.decode_email(os.path.join(samples_dir, k), include_raw_body=True,
include_attachment_data=True)
for k in os.listdir(samples_dir):
with open(os.path.join(samples_dir, k), 'rb') as fhdl:
raw_email = fhdl.read()
test = eml_parser.eml_parser.decode_email_b(raw_email, include_raw_body=True,
include_attachment_data=True)
def test_parse_email_4(self):
"""Parses the e-mails from the samples folder while keeping raw data and passing
in a filtering config 'pconf'"""
pconf = {'whiteip': ['192.168.1.1'],
'whitefor': ['a@example.com'],
'byhostentry': ['example.com']
}
for k in os.listdir(samples_dir):
test = eml_parser.eml_parser.decode_email(os.path.join(samples_dir, k), include_raw_body=True,
include_attachment_data=True, pconf=pconf)
for k in os.listdir(samples_dir):
with open(os.path.join(samples_dir, k), 'rb') as fhdl:
raw_email = fhdl.read()
test = eml_parser.eml_parser.decode_email_b(raw_email, include_raw_body=True,
include_attachment_data=True, pconf=pconf)
def test_parse_email_5(self):
"""Parses a generated sample e-mail and tests it against a known good result. In this test
we want to specifically ignore e-mail addresses without TLD."""
msg = EmailMessage()
msg['Subject'] = 'Test subject éèàöüä${}'
msg['From'] = Address("John Doe", "john.doe", "example")
msg['To'] = (Address("Jané Doe", "jane.doe", "example.com"),
Address("James Doe", "james.doe", "example.com"))
msg['Cc'] = (Address("Jané Doe", "jane.doe", "example"),
Address("James Doe", "james.doe", "example"))
msg.set_content('''Hi,
Lorem ipsüm dolor sit amét, consectetur 10$ + 5€ adipiscing elit. Praesent feugiat vitae tellus et molestie. Duis est ipsum, tristique eu pulvinar vel, aliquet a nibh. Vestibulum ultricies semper euismod. Maecenas non sagittis elit. Mauris non feugiat leo. Cras vitae quam est. Donec dapibus justo ut dictum viverra. Aliquam eleifend tortor mollis, vulputate ante sit amet, sodales elit. Fusce scelerisque congue risus mollis pellentesque. Sed malesuada erat sit amet nisl laoreet mollis. Suspendisse potenti. Fusce cursus, tortor sit amet euismod molestie, sem enim semper quam, eu ultricies leo est vel turpis.
You should subscribe by replying to test-reply@example.
''')
good_output_json = r'''{"body": [{"content_header": {"content-type": ["text/plain; charset=\"utf-8\""], "content-transfer-encoding": ["quoted-printable"]}, "content_type": "text/plain", "hash": "07de6840458e398906e73b2cd188d0da813a80ee0337cc349228d983b5ec1c7e"}], "header": {"subject": "Test subject \u00e9\u00e8\u00e0\u00f6\u00fc\u00e4${}", "from": "john.doe@example", "to": ["jane.doe@example.com", "james.doe@example.com"], "date": "1970-01-01T00:00:00+00:00", "received": [], "header": {"cc": ["Jan\u00e9 Doe <jane.doe@example>, James Doe <james.doe@example>"], "from": ["John Doe <john.doe@example>"], "content-type": ["text/plain; charset=\"utf-8\""], "mime-version": ["1.0"], "subject": ["Test subject \u00e9\u00e8\u00e0\u00f6\u00fc\u00e4${}"], "to": ["Jan\u00e9 Doe <jane.doe@example.com>, James Doe <james.doe@example.com>"], "content-transfer-encoding": ["quoted-printable"]}}}'''
good_output = json.loads(good_output_json)
test_output_json = json.dumps(eml_parser.eml_parser.decode_email_b(msg.as_bytes(), email_force_tld=True), default=json_serial)
test_output = json.loads(test_output_json)
recursive_compare(good_output, test_output)