-
-
Notifications
You must be signed in to change notification settings - Fork 0
/
SFHandlerClass.py
435 lines (365 loc) · 14 KB
/
SFHandlerClass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
# -*- coding: utf-8 -*-
# Disables deprecated urllib function warning where we use urllib in
# addFileURI below.
#
# Also disables import warnings as we try and import for PY2 and PY3
# together.
#
# pylint: disable=W1658,E1101,E0611,E0401
"""SFHandlerClass provides the functions needed to understand a
Siegfried YAML file so that it can be parsed into an sqlite DB.
"""
from __future__ import absolute_import
# PY3 compatibility (PY3 first)
try:
from urllib import parse, request
except ImportError:
import urllib
import urlparse
import codecs
import ntpath
import os.path
if __name__.startswith("sqlitefid"):
from sqlitefid.libs.PyDateHandler import PyDateHandler
else:
from libs.PyDateHandler import PyDateHandler
class SFYAMLHandler:
"""SFYAMLHandler."""
YAMLSECTION = "---"
YAMLNAMESPACE = "name"
YAMLDETAILS = "details"
HEADDETAILS = "id details "
HEADNAMESPACE = "id namespace "
HEADCOUNT = "identifier count"
FILERECORDLEN = 6
hashes = ["md5", "sha1", "sha256", "sha512", "crc"]
fileheaders = [
"filename",
"filesize",
"modified",
"errors",
"md5",
"sha1",
"sha256",
"sha512",
"crc",
]
iddata = ["ns", "id", "format", "version", "mime", "basis", "warning"]
containers = {
"zip": "x-fmt/263",
"gz": "x-fmt/266",
"tar": "x-fmt/265",
"warc": "fmt/289",
"arc": "x-fmt/219",
"arc_1": "fmt/410",
}
mismatch_warning = "extension mismatch"
filename_only = "match on filename only"
extension_only_one = "match on extension only"
extension_only_two = "extension match"
text_basis = "text match"
byte_basis = "byte match"
container_basis_one = "container match"
container_basis_two = "container name"
xml_basis = "xml match"
PROCESSING_ERROR = -1
DICTHEADER = "header"
DICTFILES = "files"
DICTID = "identification"
TYPE_CONTAINER = "Container"
TYPEFILE = "File"
# additional fields given to SF output
FIELD_FILE_NAME = "filename"
FIELDURI = "uri"
FIELDURISCHEME = "uri scheme"
FIELDDIRNAME = "directory"
FIELDYEAR = "year"
FIELDCONTTYPE = "containertype"
FIELDTYPE = "type"
FIELDMETHOD = "method"
FIELDMISMATCH = "extension mismatch"
FIELDEXT = "ext"
FIELDVERSION = "version"
def __init__(self):
# date handler class
self.pydate = PyDateHandler()
self.sectioncount = 0
self.identifiercount = 0
self.header = {}
self.hashtype = None
# Structures for holding forms information.
self.filedetails = {}
self.iddetails = {}
# All files in report.
self.files = []
self.filecount = 0
self.sfdata = {}
def getHeaders(self):
return self.sfdata[self.DICTHEADER]
def getIdentifiersList(self):
namespaces = []
ids = self.sfdata[self.DICTHEADER][self.HEADCOUNT]
for x in range(ids):
namespaces.append(
self.sfdata[self.DICTHEADER][self.HEADNAMESPACE + str(x + 1)]
)
return namespaces
def getFiles(self):
return self.sfdata[self.DICTFILES]
@staticmethod
def stripkey(line):
line = line.strip()
line = line.replace("- ", "")
return line
def stripvalue(self, line):
line = line.strip()
line = line.lstrip("'").rstrip("'")
return self.escapevalue(line)
@staticmethod
def escapevalue(line):
"""Escape values with single quotes in them.
Alternative for future reference:
* http://stackoverflow.com/a/12066822)
:params line: A line of YAML from Siegfried (string)
:returns: Escaped version of the input line (string)
"""
return line.replace("'", "''")
def handleentry(self, line):
line = line.split(":", 1)
line[0] = self.stripkey(line[0])
line[1] = self.stripvalue(line[1])
return line
def headersection(self, line):
if line != self.YAMLSECTION:
line = self.handleentry(line)
if line[0] == self.YAMLNAMESPACE:
self.identifiercount += 1
ns = self.HEADNAMESPACE + str(self.identifiercount)
self.header[ns] = line[1]
elif line[0] == self.YAMLDETAILS:
details = self.HEADDETAILS + str(self.identifiercount)
self.header[details] = line[1]
self.header[self.HEADCOUNT] = self.identifiercount
elif line[0] != "identifiers":
self.header[line[0]] = line[1]
def add_file_uri(self, filedict):
"""Add file URIs to filedict structure.
:param filedict: filedict structure containing information
about our file.
:returns: None (nonetype)
"""
fname = filedict[self.FIELD_FILE_NAME]
file_uri = self.addFileURI(fname)
if filedict[self.FIELDTYPE] == "Container":
file_uri = self.addContainerURI(filedict, file_uri)
filedict[self.FIELDURI] = file_uri
filedict[self.FIELDURISCHEME] = self.geturischeme(file_uri)
def filesection(self, sfrecord):
"""Returns some information about the SF report.
:param sfrecord: A list of non-parsed records from Siegfried
to be converted. (list[(string)])
:returns: A file dictionary to be appended to the global file
list. (dict)
"""
iddict = {} # { nsname : {id : x, mime : x } }
filedict = {}
ns = ""
iddata = {}
for s in sfrecord:
s = self.handleentry(s)
if s[0] in self.fileheaders:
filedict[s[0]] = s[1]
if s[0] in self.hashes and self.hashtype is None:
self.hashtype = s[0]
if s[0] in self.iddata:
# -------------------------------------------------------------#
# TRIGGER: add data to dict on NS as a trigger, create new dict#
# -------------------------------------------------------------#
if s[0] == "ns":
if len(iddata) > 0:
if self.FIELDVERSION not in iddata:
iddata[self.FIELDVERSION] = ""
iddict[ns] = iddata
iddata = {}
ns = s[1]
# -------------------------------------------------------------#
# TRIGGER: add data to dict on NS as a trigger, create new dict#
# -------------------------------------------------------------#
else:
if s[0] == "id":
self.getContainers(s[1], filedict)
if s[0] == "basis":
if s[1] == "":
s[1] = None
self.getMethod(s[1], iddata)
if s[0] == "warning":
if s[1] == "":
s[1] = None
self.getMethod(s[1], iddata, True)
self.getMismatch(s[1], iddata)
if s[0] == "mime":
if s[1] == "UNKNOWN" or s[1] == "":
s[1] = "none"
iddata[s[0]] = s[1]
self.add_file_uri(filedict)
if self.FIELDVERSION not in iddata:
iddata[self.FIELDVERSION] = ""
# on loop completion add final id record
iddict[ns] = iddata
# add complete id data to filedata, return
filedict[self.DICTID] = iddict
return filedict
def readSFYAML(self, sfname):
processing = False
filedata = []
with codecs.open(sfname, encoding="utf-8") as sfile:
for line in sfile:
line = line.strip()
if line == self.YAMLSECTION:
self.sectioncount += 1
# new section so handle appropriately
processing = False
if self.sectioncount == 1:
self.headersection(line)
elif self.sectioncount > 1:
if processing is False and len(filedata) > 0:
self.files.append(self.filesection(filedata))
filedata = []
else:
processing = True
if line != self.YAMLSECTION:
filedata.append(line)
# Add final section of data to list
if len(filedata) > 0:
self.files.append(self.filesection(filedata))
# Attempt at useful return value - number of files processed vs. processing error
if len(self.files) == self.sectioncount - 1:
self.filecount = len(self.files)
else:
self.filecount = self.PROCESSING_ERROR
# concatenate header and file details (not needed, but maybe convenient)
self.sfdata[self.DICTHEADER] = self.header
self.sfdata[self.DICTFILES] = self.files
return self.filecount
def getMismatch(self, warning, iddata):
if warning is not None:
if self.mismatch_warning in warning:
iddata[self.FIELDMISMATCH] = True
else:
iddata[self.FIELDMISMATCH] = False
def getMethod(self, basis, iddata, warning=False):
if warning is False and basis is not None:
if self.container_basis_one in basis or self.container_basis_two in basis:
iddata[self.FIELDMETHOD] = "Container"
elif self.byte_basis in basis:
iddata[self.FIELDMETHOD] = "Signature"
elif self.xml_basis in basis:
iddata[self.FIELDMETHOD] = "XML"
elif self.text_basis in basis:
iddata[self.FIELDMETHOD] = "Text"
elif self.extension_only_two in basis:
iddata[self.FIELDMETHOD] = "Extension"
else:
iddata[self.FIELDMETHOD] = ""
if warning is True and basis is not None:
if self.filename_only in basis:
method = "Filename"
elif self.extension_only_one in basis:
method = "Extension"
else:
# warning comes after basis in SF report
# posit: at this point anything else is not
# really an identification at all
method = "None"
if self.FIELDMETHOD not in iddata:
iddata[self.FIELDMETHOD] = method
@staticmethod
def getDirName(filepath):
return os.path.dirname(filepath)
@staticmethod
def getFileName(filepath):
fname = os.path.basename(filepath)
if len(fname) == len(filepath):
# Retrieving filename probably didn't work... maybe windows
# path.
fname = ntpath.basename(filepath)
return os.path.basename(fname)
def adddirname(self, sfdata):
for row in sfdata[self.DICTFILES]:
fname = row[self.FIELD_FILE_NAME]
row[self.FIELDDIRNAME] = self.getDirName(fname)
return sfdata
def addfilename(self, sfdata):
for row in sfdata[self.DICTFILES]:
fname = row[self.FIELD_FILE_NAME]
row["name"] = self.getFileName(fname)
return sfdata
def addYear(self, sfdata):
for row in sfdata[self.DICTFILES]:
year = row["modified"]
row[self.FIELDYEAR] = self.getYear(year)
return sfdata
def getYear(self, datestring):
return self.pydate.getYear(datestring)
def getContainers(self, id_, filedict):
# only set as File if and only if it isn't a Container
# container overrides all...
if id_ in self.containers.values():
filedict[self.FIELDTYPE] = self.TYPE_CONTAINER
# get container type: http://stackoverflow.com/a/13149770
filedict[self.FIELDCONTTYPE] = list(self.containers.keys())[
list(self.containers.values()).index(id_)
]
else:
if self.FIELDTYPE in filedict:
if filedict[self.FIELDTYPE] != self.TYPE_CONTAINER:
filedict[self.FIELDTYPE] = self.TYPEFILE
else:
filedict[self.FIELDTYPE] = self.TYPEFILE
@staticmethod
def addFileURI(filename):
"""Creates a file URI for a given path.
:param filename: filename (string)
:returns: filename as URI (string)
"""
fname = filename.replace("\\", "/")
# PY3 compatibility.
try:
test = request.pathname2url(fname.encode("utf-8"))
except NameError:
test = urllib.pathname2url(fname.encode("utf-8"))
try:
fname = parse.urljoin("file:", test)
fname = parse.unquote(fname)
except NameError:
fname = urlparse.urljoin("file:", test)
fname = urllib.unquote(fname)
return fname
def addContainerURI(self, container, filename):
"""Creates a container URI for a given path.
:param container: container object (dict)
:param filename: filename (string)
:returns: A modified file URI with the attached container URI
scheme. If the second arc URI variant the prefix is
corrected to remove _1 suffix (string)
"""
fname = filename
fname = container[self.FIELDCONTTYPE] + ":" + fname
fname = fname.replace(
container[self.FIELD_FILE_NAME], container[self.FIELD_FILE_NAME] + "!"
)
return fname.replace("arc_1", "arc")
@staticmethod
def geturischeme(fname):
try:
return parse.urlparse(fname).scheme
except NameError:
return urlparse.urlparse(fname).scheme
def addExt(self, sfdata):
for row in sfdata[self.DICTFILES]:
name = row["name"].rsplit(".", 1)
if len(name) == 2:
row[self.FIELDEXT] = name[1]
else:
row[self.FIELDEXT] = "" # no extension...
return sfdata