-
Notifications
You must be signed in to change notification settings - Fork 1.1k
/
filesystem.py
617 lines (556 loc) · 23.1 KB
/
filesystem.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
import os
import logging
from .plugin import SimStatePlugin
from ..storage.file import SimFile
from ..errors import SimMergeError
from ..misc.ux import once
l = logging.getLogger(name=__name__)
class SimFilesystem(SimStatePlugin): # pretends links don't exist
"""
angr's emulated filesystem. Available as state.fs.
When constructing, all parameters are optional.
:param files: A mapping from filepath to SimFile
:param pathsep: The character used to separate path elements, default forward slash.
:param cwd: The path of the current working directory to use
:param mountpoints: A mapping from filepath to SimMountpoint
:ivar pathsep: The current pathsep
:ivar cwd: The current working directory
:ivar unlinks: A list of unlink operations, tuples of filename and simfile. Be careful, this list is
shallow-copied from successor to successor, so don't mutate anything in it without copying.
"""
def __init__(self, files=None, pathsep=None, cwd=None, mountpoints=None):
super(SimFilesystem, self).__init__()
if files is None: files = {}
if pathsep is None: pathsep = b'/'
if cwd is None: cwd = pathsep
if mountpoints is None: mountpoints = {}
self.pathsep = pathsep
self.cwd = cwd
self._unlinks = []
self._files = {}
self._mountpoints = {}
for fname in mountpoints:
self.mount(fname, mountpoints[fname])
for fname in files:
self.insert(fname, files[fname])
@property
def unlinks(self):
for _, f in self._unlinks:
f.set_state(self.state)
return self._unlinks
def set_state(self, state):
super(SimFilesystem, self).set_state(state)
for fname in self._files:
self._files[fname].set_state(state)
for fname in self._mountpoints:
self._mountpoints[fname].set_state(state)
@SimStatePlugin.memo
def copy(self, memo):
o = SimFilesystem(
files={k: v.copy(memo) for k, v in self._files.items()},
pathsep=self.pathsep,
cwd=self.cwd,
mountpoints={k: v.copy(memo) for k, v in self._mountpoints.items()}
)
o._unlinks = list(self._unlinks)
return o
def merge(self, others, merge_conditions, common_ancestor=None):
merging_occured = False
for o in others:
if o.cwd != self.cwd:
raise SimMergeError("Can't merge filesystems with disparate cwds")
if len(o._mountpoints) != len(self._mountpoints):
raise SimMergeError("Can't merge filesystems with disparate mountpoints")
if list(map(id, o.unlinks)) != list(map(id, self.unlinks)):
raise SimMergeError("Can't merge filesystems with disparate unlinks")
for fname in self._mountpoints:
try:
subdeck = [o._mountpoints[fname] for o in others]
except KeyError:
raise SimMergeError("Can't merge filesystems with disparate file sets")
if common_ancestor is not None and fname in common_ancestor._mountpoints:
common_mp = common_ancestor._mountpoints[fname]
else:
common_mp = None
merging_occured |= self._mountpoints[fname].merge(subdeck, merge_conditions, common_ancestor=common_mp)
# this is a little messy
deck = [self] + others
all_files = set.union(*(set(o._files.keys()) for o in deck))
for fname in all_files:
subdeck = [o._files[fname] if fname in o._files else None for o in deck]
representative = next(x for x in subdeck if x is not None)
for i, v in enumerate(subdeck):
if v is None:
subdeck[i] = representative()
if i == 0:
self._files[fname] = subdeck[i]
if common_ancestor is not None and fname in common_ancestor._files:
common_simfile = common_ancestor._files[fname]
else:
common_simfile = None
merging_occured |= subdeck[0].merge(subdeck[1:], merge_conditions, common_ancestor=common_simfile)
return merging_occured
def widen(self, others): # pylint: disable=unused-argument
if once('fs_widen_warning'):
l.warning("Filesystems can't be widened yet - beware unsoundness")
def _normalize_path(self, path):
"""
Takes a path and returns a simple absolute path as a list of directories from the root
"""
if type(path) is str:
path = path.encode()
path = path.split(b'\0')[0]
if path[0:1] != self.pathsep:
path = self.cwd + self.pathsep + path
keys = path.split(self.pathsep)
i = 0
while i < len(keys):
if keys[i] == b'':
keys.pop(i)
elif keys[i] == b'.':
keys.pop(i)
elif keys[i] == b'..':
keys.pop(i)
if i != 0:
keys.pop(i-1)
i -= 1
else:
i += 1
return keys
def _join_chunks(self, keys):
"""
Takes a list of directories from the root and joins them into a string path
"""
return self.pathsep + self.pathsep.join(keys)
def chdir(self, path):
"""
Changes the current directory to the given path
"""
self.cwd = self._join_chunks(self._normalize_path(path))
def get(self, path):
"""
Get a file from the filesystem. Returns a SimFile or None.
"""
mountpoint, chunks = self.get_mountpoint(path)
if mountpoint is None:
return self._files.get(self._join_chunks(chunks))
else:
return mountpoint.get(chunks)
def insert(self, path, simfile):
"""
Insert a file into the filesystem. Returns whether the operation was successful.
"""
if self.state is not None:
simfile.set_state(self.state)
mountpoint, chunks = self.get_mountpoint(path)
if mountpoint is None:
self._files[self._join_chunks(chunks)] = simfile
return True
else:
return mountpoint.insert(chunks, simfile)
def delete(self, path):
"""
Remove a file from the filesystem. Returns whether the operation was successful.
This will add a ``fs_unlink`` event with the path of the file and also the index into the `unlinks` list.
"""
mountpoint, chunks = self.get_mountpoint(path)
apath = self._join_chunks(chunks)
if mountpoint is None:
try:
simfile = self._files.pop(apath)
except KeyError:
return False
else:
self.state.history.add_event('fs_unlink', path=apath, unlink_idx=len(self.unlinks))
self.unlinks.append((apath, simfile))
return True
else:
return mountpoint.delete(chunks)
def mount(self, path, mount):
"""
Add a mountpoint to the filesystem.
"""
self._mountpoints[self._join_chunks(self._normalize_path(path))] = mount
def unmount(self, path):
"""
Remove a mountpoint from the filesystem.
"""
del self._mountpoints[self._join_chunks(self._normalize_path(path))]
def get_mountpoint(self, path):
"""
Look up the mountpoint servicing the given path.
:return: A tuple of the mount and a list of path elements traversing from the mountpoint to the specified file.
"""
path_chunks = self._normalize_path(path)
for i in range(len(path_chunks) - 1, -1, -1):
partial_path = self._join_chunks(path_chunks[:-i])
if partial_path in self._mountpoints:
mountpoint = self._mountpoints[partial_path]
if mountpoint is None:
break
return mountpoint, path_chunks[-i:]
return None, path_chunks
SimFilesystem.register_default('fs')
class SimMount(SimStatePlugin):
"""
This is the base class for "mount points" in angr's simulated filesystem. Subclass this class and
give it to the filesystem to intercept all file creations and opens below the mountpoint.
Since this a SimStatePlugin you may also want to implement set_state, copy, merge, etc.
"""
def get(self, _path_elements):
"""
Implement this function to instrument file lookups.
:param path_elements: A list of path elements traversing from the mountpoint to the file
:return: A SimFile, or None
"""
raise NotImplementedError
def insert(self, _path_elements, simfile):
"""
Implement this function to instrument file creation.
:param path_elements: A list of path elements traversing from the mountpoint to the file
:param simfile: The file to insert
:return: A bool indicating whether the insert occured
"""
raise NotImplementedError
def delete(self, _path_elements):
"""
Implement this function to instrument file deletion.
:param path_elements: A list of path elements traversing from the mountpoint to the file
:return: A bool indicating whether the delete occured
"""
raise NotImplementedError
class SimHostFilesystem(SimMount):
"""
Simulated mount that makes some piece from the host filesystem available to the guest.
:param str host_path: The path on the host to mount
:param str pathsep: The host path separator character, default os.path.sep
"""
def __init__(self, host_path, pathsep=os.path.sep):
super(SimHostFilesystem, self).__init__()
self.host_path = host_path
self.pathsep = pathsep
self.cache = {}
self.deleted_list = set()
def get(self, path_elements):
path = self.pathsep.join(x.decode() for x in path_elements)
if path in self.deleted_list:
return None
if path not in self.cache:
host_path = os.path.join(self.host_path, path)
simfile = self._load_file(host_path)
if simfile is None:
return None
self.insert(path_elements, simfile)
return self.cache[path]
@staticmethod
def _load_file(path):
try:
with open(path, 'rb') as fp:
content = fp.read()
except OSError:
return None
else:
return SimFile(name='file://' + path, content=content, size=len(content))
def insert(self, path_elements, simfile):
path = self.pathsep.join(x.decode() for x in path_elements)
simfile.set_state(self.state)
self.cache[path] = simfile
self.deleted_list.discard(path)
return True
def delete(self, path_elements):
path = self.pathsep.join(x.decode() for x in path_elements)
self.deleted_list.add(path)
return self.cache.pop(path, None) is not None
@SimStatePlugin.memo
def copy(self, memo):
x = SimHostFilesystem(self.host_path, pathsep=self.pathsep)
x.cache = {fname: self.cache[fname].copy(memo) for fname in self.cache}
x.deleted_list = set(self.deleted_list)
return x
def set_state(self, state):
super(SimHostFilesystem, self).set_state(state)
for fname in self.cache:
self.cache[fname].set_state(state)
def merge(self, others, merge_conditions, common_ancestor=None):
merging_occured = False
for o in others:
if o.host_path != self.host_path:
raise SimMergeError("Can't merge SimHostFilesystems with disparate host paths")
if o.pathsep != self.pathsep:
raise SimMergeError("Can't merge SimHostFilesystems with disparate pathseps")
if o.deleted_list != self.deleted_list:
raise SimMergeError("Can't merge SimHostFilesystems with disparate deleted files")
deck = [self] + others
all_files = set.union(*(set(o._files.keys()) for o in deck))
for fname in all_files:
subdeck = []
basecase = None
for o in deck:
try:
subdeck.append(o.cache[fname])
except KeyError:
if basecase is None:
basecase = self._load_file(os.path.join(self.host_path, fname))
subdeck.append(basecase)
if common_ancestor is not None and fname in common_ancestor.cache:
common_simfile = common_ancestor.cache[fname]
else:
common_simfile = None
merging_occured |= subdeck[0].merge(subdeck[1:], merge_conditions, common_ancestor=common_simfile)
return merging_occured
def widen(self, others): # pylint: disable=unused-argument
if once('host_fs_widen_warning'):
l.warn("The host filesystem mount can't be widened yet - beware unsoundness")
#class SimDirectory(SimStatePlugin):
# """
# This is the base class for directories in angr's emulated filesystem. An instance of this class or a subclass will
# be found as ``state.fs``, representing the root of the filesystem.
#
# :ivar files: A mapping from filename to file that this directory contains.
# """
# def __init__(self, files=None, writable=True, parent=None, pathsep='/'):
# super(SimDirectory, self).__init__()
# self.files = files
# self.writable = writable
# self.parent = parent if parent is not None else self
# self.pathsep = pathsep
# self.files['.'] = self
# self.files['..'] = self.parent
#
# def __len__(self):
# return len(self.files)
#
# def lookup(self, path, writing=False):
# """
# Look up the file or directory at the end of the given path.
# This method should be called on the current working directory object.
#
# :param str path: The path to look up
# :param bool writing: Whether the operation desired requires write permissions
# :returns: The SimDirectory or SimFile object specified, or None if not found, or False if writing
# was requested and the target is nonwritable
# """
# if len(path) == 0:
# return None
# if path[0] == self.pathsep:
# # lookup the filesystem root
# root = self
# while root.parent is not root:
# root = root.parent
# return root._lookup(path[1:], writing)
# else:
# return self._lookup(path, writing)
#
# def _lookup(self, path, writing):
# while path.startswith(self.pathsep):
# path = path[1:]
#
# if len(path) == 0:
# if writing and not self.writable:
# return False
# return self
#
# for fname, simfile in self.files.items():
# if path.startswith(fname):
# if len(path) == len(fname):
# if writing and not simfile.writable:
# return False
# return simfile
# elif path[len(fname)] == self.pathsep:
# if isinstance(simfile, SimDirectory):
# return simfile._lookup(path[len(fname)+1:])
# else: # TODO: symlinks
# return None
#
# return None
#
# def insert(self, path, simfile):
# """
# Add a file to the filesystem.
# This method should be called on the current working directory object.
#
# :param str path: The path to insert the new file at
# :param simfile: The new file or directory
# :returns: A boolean indicating whether the operation succeeded
# """
# while len(path) > 1 and path[-1] == self.pathsep:
# path = path[:-1]
#
# if self.pathsep not in path:
# if path in self.files:
# return False
# if isinstance(simfile, SimDirectory):
# if simfile.parent is simfile:
# simfile.parent = self
# simfile.pathsep = self.pathsep
# else:
# l.error("Trying to add directory to filesystem which already has a parent")
#
# self.files[path] = simfile
# simfile.set_state(self.state)
# return True
# else:
# lastsep = path.rindex(self.pathsep) + 1
# head, tail = path[:lastsep], path[lastsep:]
# parent = self.lookup(head, True)
#
# if not parent:
# return False
# return parent.insert(tail, simfile)
#
# def remove(self, path):
# """
# Remove a file from the filesystem. If the target is a directory, the directory must be empty.
# This method should be called on the current working directory object.
#
# :param str path: The path to remove the file at
# :returns: A boolean indicating whether the operation succeeded
# """
# while len(path) > 1 and path[-1] == self.pathsep:
# # TODO: when symlinks exist this will need to be fixed to delete the target of the
# # symlink instead of the link itself
# path = path[:-1]
#
# if self.pathsep not in path:
# if path in ('.', '..'):
# return False
# if path not in self.files:
# return False
# if isinstance(self.files[path], SimDirectory) and len(self.files[path]) != 2:
# return False
#
# del self.files[path]
# return True
# else:
# lastsep = path.rindex(self.pathsep) + 1
# head, tail = path[:lastsep], path[lastsep:]
# parent = self.lookup(head, True)
#
# if not parent:
# return False
# return parent.remove(tail)
#
# @SimStatePlugin.memo
# def copy(self, memo):
# return SimDirectory(
# files={x: y.copy(memo) for x, y in self.files.items()},
# writable=self.writable,
# parent=self.parent.copy(memo),
# pathsep=self.pathsep)
#
# def merge(self, others, conditions, ancestor=None):
# new_files = {path: (simfile, [], []) for path, simfile in self.files.items() if path not in ('.', '..')}
# for other, condition in zip(others, conditions):
# if type(other) is not type(self):
# raise SimMergeError("Can't merge filesystem elements of disparate types")
# for path, simfile in other.files.items():
# if path in ('.', '..'):
# continue
# if path not in new_files:
# l.warning("Cannot represent the conditional creation of files")
# new_files[path] = (simfile, [], [])
# else:
# new_files[path][1].append(simfile)
# new_files[path][2].append(condition)
#
# for k in new_files:
# new_files[k][0].merge(new_files[k][1], new_files[k][2], ancestor)
# new_files[k] = new_files[k][0]
# new_files['.'] = self
# new_files['..'] = self.parent
# self.files = new_files
#
# def widen(self, others):
# new_files = {path: [simfile] for path, simfile in self.files.items() if path not in ('.', '..')}
# for other in others:
# if type(other) is not type(self):
# raise SimMergeError("Can't merge filesystem elements of disparate types")
# for path, simfile in other.files.items():
# if path in ('.', '..'):
# continue
# if path not in new_files:
# new_files[path] = [simfile]
# else:
# new_files[path].append(simfile)
#
# for k in new_files:
# new_files[k][0].widen(new_files[k][1:])
# new_files[k] = new_files[k][0]
# new_files['.'] = self
# new_files['..'] = self.parent
# self.files = new_files
#
#class SimDirectoryConcrete(SimDirectory):
# """
# A SimDirectory that forwards its requests to the host filesystem
#
# :param host_path: The path on the host filesystem to provide
# :param writable: Whether to allow mutation of the host filesystem by the guest
# """
# def __init__(self, host_path, writable=False, pathsep='/', host_root=None, parent=None):
# super(SimConcreteDirectory, self).__init__(files={}, writable=writable, parent=parent, pathsep=pathsep)
# self.host_path = os.path.realpath(host_path)
# self.host_root = self.host_path if host_root is None else host_root
#
# def _lookup(self, path, writing):
# partial_path = self.host_path
# for i, pathkey in enumerate(path.split(self.pathsep)):
# if partial_path == self.host_root and pathkey == '..':
# target = self.pathsep.join(path.split(self.pathsep)[i+1:])
# return self.parent._lookup(target, writing)
# if not os.path.isdir(partial_path):
# return None
#
# partial_path = os.path.realpath(partial_path + self.pathsep + pathkey)
#
# if writing and not self.writable:
# return False
#
# if os.path.isdir(partial_path):
# f = SimDirectoryConcrete(host_path=partial_path, writable=self.writable, host_root=self.host_root, parent=self.parent)
# f.set_state(self.state)
# return f
# elif os.path.isfile(partial_path):
# try:
# f = SimFileConcrete(host_path=partial_path, writable=self.writable)
# f.set_state(self.state)
# return f
# except OSError:
# return None
# else:
# raise SimFilesystemError("Can't handle something other than a file or directory in a concrete filesystem")
#
# def insert(self, path, simfile):
# if self.pathsep in path:
# return super(SimDirectoryConcrete, self).insert(path, simfile)
# else:
# fullpath = os.path.join(self.host_path, path)
# if os.path.exists(fullpath):
# return False
# with open(fullpath, 'w') as fp:
# fp.write(simfile.concretize())
# return True
#
# def remove(self, path):
# if self.pathsep in path:
# return super(SimDirectoryConcrete, self).remove(path)
# else:
# fullpath = os.path.join(self.host_path, path)
# if not os.path.exists(fullpath):
# return False
# if os.path.isdir(fullpath):
# try:
# os.rmdir(fullpath)
# except OSError:
# return False
# return True
# elif os.path.isfile(fullpath):
# try:
# os.unlink(fullpath)
# except OSError:
# return False
# return True
# else:
# raise SimFilesystemError("Can't handle anything but files and directories in concrete filesystem")
#
#SimDirectory.register_default('fs')