-
Notifications
You must be signed in to change notification settings - Fork 6
/
parse_mbox.py
247 lines (203 loc) · 7.58 KB
/
parse_mbox.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
from html.parser import HTMLParser
from email.header import Header, decode_header
import mailbox
import base64
import quopri
import re
import sys
import html2text
""" ____Format utils____ """
class MLStripper(HTMLParser):
"""
Strip HTML from strings in Python
https://stackoverflow.com/questions/753052/strip-html-from-strings-in-python
"""
def __init__(self):
super().__init__()
self.reset()
self.fed = []
def handle_data(self, d):
self.fed.append(d)
def get_data(self):
return ''.join(self.fed)
def strip_tags(html):
"""
Use MLStripper class to strip HMTL from string
"""
s = MLStripper()
s.feed(html)
return s.get_data()
def strip_payload(payload):
"""
Remove carriage returns and new lines
"""
return payload.replace('\r', ' ').replace('\n', ' ')
def encoded_words_to_text(encoded_words):
"""
Not used, left for reference only
https://dmorgan.info/posts/encoded-word-syntax/
"""
encoded_word_regex = r'=\?{1}(.+)\?{1}([B|Q])\?{1}(.+)\?{1}='
# encoded_word_regex = r'=\?{1}.+\?{1}[B|Q|b|q]\?{1}.+\?{1}='
charset, encoding, encoded_text = re.match(encoded_word_regex, encoded_words, re.IGNORECASE).groups()
if encoding.upper() == 'B':
byte_string = base64.b64decode(encoded_text)
elif encoding.upper() == 'Q':
byte_string = quopri.decodestring(encoded_text)
return byte_string.decode(charset)
""" ____Custom Message class____ """
class CustomMessage():
"""
The CusomMessage class represents an email message with three fields:
- :body:
- :subject:
- :content_type: (document, plain text, HTML, image...)
"""
def __init__(self, body, subject, content_type):
"""
Constructor
It tries to find the subject's encoding and decode it accordingly
It decodes the body based on the content type
"""
self.content_type = content_type
# Decode subject if encoded in utf-8
if isinstance(subject, Header):
subject = decode_header(subject)[0][0].decode('utf-8')
# The subject can have several parts encoded in different formats
# These parts are flagged with strings like '=?UTF-8?'
if subject is not None and ('=?ISO-' in subject.upper() or '=?UTF-8?' in subject.upper()):
self.subject = ''
for subject_part in decode_header(subject):
# Decode each part based on its encoding
# The encoding could be returnd by the "decode_header" function
if subject_part[1] is None:
self.subject += strip_payload(subject_part[0].decode())
else:
self.subject += strip_payload(subject_part[0].decode(subject_part[1]))
elif subject is None:
# Empty subject
self.subject = ''
else:
# Subject is not encoded or other corner cases that are not considered
self.subject = strip_payload(subject)
# Body decoding
if 'text' in self.content_type:
# Decode text messages
try:
decoded_body = body.decode('utf-8')
except UnicodeDecodeError:
decoded_body = body.decode('latin-1')
if 'html' in self.content_type:
# If it is an HTML message, remove HTML tags
h = html2text.HTML2Text()
h.ignore_links = True
h.ignore_tables = True
h.ignore_images = True
h.ignore_anchors = True
h.ignore_emphasis = True
self.body = strip_payload(h.handle(decoded_body))
else:
self.body = strip_payload(decoded_body)
else:
# If not text, return the body as it is
self.body = body
def __str__(self):
body_length = 2000
printed_body = self.body[:body_length]
if 'text' in self.content_type:
# Shorten long message bodies
if len(self.body) > body_length:
printed_body += "..."
return " ---- Custom Message ---- \n -- Content Type: {}\n -- Subject: {}\n -- Body --\n{}\n\n".format(self.content_type, self.subject, printed_body)
def get_body(self):
return self.body
def get_subject(self):
return self.subject
def get_content_type(self):
return self.content_type
def create_vector_line(self, label):
"""
Creates a CSV line with the message's body and given :label:
Removes any commas from body and label
"""
return '{body},{label}'.format(body=self.body.replace(',', ''), label=label)
@staticmethod
def extract_types_from_messages(messages):
"""
Takes a list of CustomMessage and extracts all the existing values for content_type
['application/ics', 'application/octet-stream', 'application/pdf', 'image/gif', 'image/jpeg',
'image/png', 'text/calendar', 'text/html', 'text/plain', 'text/x-amp-html']
"""
types = set()
for m in messages:
types.add(m.get_content_type())
return sorted(types)
""" ____Extraction utils____ """
def extract_message_payload(mes, parent_subject=None):
"""
Extracts recursively the payload of the messages contained in :mes:
When a message is embedded in another, it uses the parameter :parent_subject:
to set the subject properly (it uses the parent's subject)
"""
extracted_messages = []
if mes.is_multipart():
if parent_subject is None:
subject_for_child = mes.get('Subject')
else:
subject_for_child = parent_subject
for part in mes.get_payload():
extracted_messages.extend(extract_message_payload(part, subject_for_child))
else:
extracted_messages.append(CustomMessage(mes.get_payload(decode=True), parent_subject, mes.get_content_type()))
return extracted_messages
def text_messages_to_string(mes):
"""
Returns the email's body extracted from :mes: as a string.
Ignores images and documents.
:mes: should be a list of CustomMessage objects.
"""
output = ''
for m in mes:
if m.get_content_type().startswith('text'):
output += str(m)
return output
def create_classification_line(mes, label):
"""
Creates CSV line(s) with two columns: the email's body extracted from :mes:
and its classification (:label:)
Ignores images, documents and calendar messages.
:mes: should be a list of CustomMessage objects.
"""
output = ''
for m in mes:
if m.get_content_type().startswith('text') and m.get_content_type() != 'text/calendar':
output += m.create_vector_line(label) + '\n'
return output
def to_file(text, file):
"""
Writes :text: to :file:
"""
f = open(file, 'w')
f.write(text)
f.close
def extract_mbox_file(file):
"""
Extracts all the messages included in an mbox :file:
by calling extract_message_payload
"""
mbox = mailbox.mbox(file)
messages = []
for message in mbox:
messages.extend(extract_message_payload(message))
return messages
if __name__ == '__main__':
argv = sys.argv
if len(argv) != 2:
print('Invalid arguments')
else:
file = argv[1]
messages = extract_mbox_file(file)
# Call to create a CSV file with the extracted data (body + label)
# to_file(create_classification_line(messages, 'label'), file + '_features.csv')
# Call to export all the extracted data
# to_file(text_messages_to_string(messages), file + '_full_extract')