Skip to content

Commit

Permalink
Merge branch 'main' into parallels
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed Apr 26, 2023
2 parents f77e696 + 785d0c9 commit 44bb782
Show file tree
Hide file tree
Showing 46 changed files with 807 additions and 341 deletions.
2 changes: 1 addition & 1 deletion dissect/target/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ def open(item: Union[list, str, BinaryIO, Path], *args, **kwargs):
):
return container(item, *args, **kwargs)
except ImportError as e:
log.warning("Failed to import %s", container)
log.info("Failed to import %s", container)
log.debug("", exc_info=e)
except Exception as e:
raise ContainerError(f"Failed to open container {item}", cause=e)
Expand Down
2 changes: 1 addition & 1 deletion dissect/target/filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -1383,7 +1383,7 @@ def open(fh: BinaryIO, *args, **kwargs) -> Filesystem:
instance.volume = fh
return instance
except ImportError as e:
log.warning("Failed to import %s", filesystem)
log.info("Failed to import %s", filesystem)
log.debug("", exc_info=e)

raise FilesystemError(f"Failed to open filesystem for {fh}")
Expand Down
208 changes: 79 additions & 129 deletions dissect/target/filesystems/exfat.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from __future__ import annotations

import stat
from typing import BinaryIO, Iterator, Optional

from dissect.fat import exfat
from dissect.util.stream import RunlistStream
Expand All @@ -8,143 +11,114 @@
from dissect.target.filesystem import Filesystem, FilesystemEntry
from dissect.target.helpers import fsutil

ExfatFileTree = tuple[exfat.c_exfat.FILE, dict[str, Optional["ExfatFileTree"]]]


class ExfatFilesystem(Filesystem):
__fstype__ = "exfat"

def __init__(self, fh=None, *args, **kwargs):
def __init__(self, fh: BinaryIO, *args, **kwargs):
super().__init__(fh, case_sensitive=False, alt_separator="\\", *args, **kwargs)
self.exfat = exfat.ExFAT(fh)
self.cluster_size = self.exfat.cluster_size

@staticmethod
def _detect(fh):
def _detect(fh: BinaryIO) -> bool:
return fh.read(11)[3:] == b"EXFAT "

def get(self, path):
def get(self, path: str) -> ExfatFilesystemEntry:
"""Returns a ExfatFilesystemEntry object corresponding to the given pathname"""
try:
dirname, file = fsutil.split(path, alt_separator=self.alt_separator)
if path == dirname and not file:
entry = self._gen_dict_extract(dirname)
elif dirname and file:
entry = self._gen_dict_extract(dirname)[1][file]
else:
entry = self._gen_dict_extract(file)
return ExfatFilesystemEntry(self, entry, file)
except TypeError as e:
raise FileNotFoundError(path, cause=e)
except KeyError as e:
raise FileNotFoundError(path, cause=e)

def open(self, path):
"""Return file handle (file like object)"""
return self.get(path).open()

def iterdir(self, path):
"""List the directory contents of a directory. Returns a generator of strings."""
try:
files = self.exfat.files.keys() if path == "/" else self._gen_dict_extract(path)[1].keys()
except AttributeError as e:
raise NotADirectoryError(path, cause=e)
except TypeError as e:
raise FileNotFoundError(path, cause=e)

for file in files:
yield file

def scandir(self, path):
"""List the directory contents of a directory. Returns a generator of filesystem entries."""

try:
paths = []
files = self.exfat.files.keys() if path == "/" else self._gen_dict_extract(path)[1].keys()
except AttributeError as e:
raise NotADirectoryError(path, cause=e)
except TypeError as e:
raise FileNotFoundError(path, cause=e)

for file in files:
if path == "/":
yield self.get(file)
else:
paths.append(fsutil.join(path, file, alt_separator=self.alt_separator))
return ExfatFilesystemEntry(self, path, self._get_entry(path))

for path in paths:
yield self.get(path)
def _get_entry(self, path: str, root: Optional[ExfatFileTree] = None) -> ExfatFileTree:
dirent = root if root is not None else self.exfat.files["/"]

def stat(self, path):
"""Returns POSIX file status results"""
return self.get(path).stat()
# Programmatically we will often use the `/` separator, so replace it
# with the native path separator of exFAT `/` is an illegal character
# in exFAT filenames, so it's safe to replace
parts = path.replace("/", "\\").split("\\")

def _gen_dict_extract(self, key, var=None):
var = self.exfat.files if var is None else var
for part in parts:
if not part:
continue

if hasattr(var, "items"):
for k, v in var.items():
if k == key:
return v
if isinstance(v, dict):
for result in self._gen_dict_extract(key, v):
return result
file_tree = dirent[1]
if file_tree is None:
raise NotADirectoryError(f"Not a directory: {path}")

for entry_name, entry_file_tree in file_tree.items():
if entry_name.upper() == part.upper():
dirent = entry_file_tree
break
else:
raise FileNotFoundError(f"File not found: {path}")

return dirent


class ExfatFilesystemEntry(FilesystemEntry):
def __init__(self, fs, entry=None, path=None):
def __init__(
self,
fs: ExfatFilesystem,
path: str,
entry: ExfatFileTree,
):
super().__init__(fs, path, entry)
self.fs = fs
self.entry = entry[0]
self.files = entry[1]
self.name = path
self.size = self.entry.stream.data_length
self.cluster = self.entry.stream.location
self.size = self.entry[0].stream.data_length
self.cluster = self.entry[0].stream.location

def get(self, path: str) -> ExfatFilesystemEntry:
"""Get a filesystem entry relative from the current one."""
full_path = fsutil.join(self.path, path, alt_separator=self.fs.alt_separator)
return ExfatFilesystemEntry(self.fs, full_path, self.fs._get_entry(path, self.entry))

def open(self) -> BinaryIO:
"""Returns file handle (file like object)"""
if self.entry[0].stream.flags.not_fragmented:
runlist = self.fs.exfat.runlist(self.cluster, True, self.size)
else:
runlist = self.fs.exfat.runlist(self.cluster, False)
fh = RunlistStream(self.fs.exfat.filesystem, runlist, self.size, self.fs.cluster_size)
return fh

def __repr__(self):
return repr(self.entry)
def _iterdir(self) -> Iterator[tuple[str, ExfatFileTree]]:
if not self.is_dir():
raise NotADirectoryError(self.path)

for entry_name, entry_file_tree in self.entry[1].items():
if entry_name in (".", ".."):
continue
yield (entry_name, entry_file_tree)

def iterdir(self) -> Iterator[str]:
"""List the directory contents of a directory. Returns a generator of strings."""
for entry_name, _ in self._iterdir():
yield entry_name

def scandir(self) -> Iterator[ExfatFilesystemEntry]:
"""List the directory contents of this directory. Returns a generator of filesystem entries."""
for entry_name, entry_file_tree in self._iterdir():
path = fsutil.join(self.path, entry_name, alt_separator=self.fs.alt_separator)
yield ExfatFilesystemEntry(self.fs, path, entry_file_tree)

def is_symlink(self):
def is_symlink(self) -> bool:
"""Return whether this entry is a link."""
return False

def is_dir(self):
def is_dir(self) -> bool:
"""Return whether this entry is a directory. Resolves symlinks when possible."""
return bool(self.entry.metadata.attributes.directory)
return bool(self.entry[0].metadata.attributes.directory)

def is_file(self):
def is_file(self) -> bool:
"""Return whether this entry is a file. Resolves symlinks when possible."""
return not self.is_dir()

def get(self, path):
"""Get a filesystem entry relative from the current one."""
if self.is_dir():
entry = self.files[path]
return ExfatFilesystemEntry(self.fs, entry, path)
else:
raise NotADirectoryError(self.name)

def iterdir(self):
"""List the directory contents of a directory. Returns a generator of strings."""
if self.is_dir():
files = self.files.keys()

for file in files:
yield file
else:
raise NotADirectoryError(self.name)
def stat(self) -> fsutil.stat_result:
return self.lstat()

def scandir(self):
"""List the directory contents of this directory. Returns a generator of filesystem entries."""
if self.is_dir():
files = self.files.keys()

for file in files:
yield self.get(file)
else:
raise NotADirectoryError(self.name)

def stat(self):
def lstat(self) -> fsutil.stat_result:
"""Return the stat information of this entry."""
fe = self.entry
fe = self.entry[0]
size = fe.stream.data_length
addr = fe.stream.location

Expand Down Expand Up @@ -175,27 +149,3 @@ def stat(self):
ctime.timetuple().timestamp(),
]
return fsutil.stat_result(st_info)

def open(self):
"""Returns file handle (file like object)"""
if self.entry.stream.flags.not_fragmented:
runlist = self.fs.exfat.runlist(self.cluster, True, self.size)
else:
runlist = self.fs.exfat.runlist(self.cluster, False)
fh = RunlistStream(self.fs.exfat.filesystem, runlist, self.size, self.fs.cluster_size)
return fh

def readlink(self):
return TypeError()

def readlink_ext(self):
return TypeError()

def lstat(self):
return TypeError()

def attr(self):
return TypeError()

def lattr(self):
return TypeError()
10 changes: 7 additions & 3 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,9 @@ def register(module_name: str, class_name: str, internal: bool = True) -> None:
LOADERS_BY_SCHEME[module_name] = loader


def find_loader(item: Path, parsed_path: Optional[urllib.parse.ParseResult] = None) -> Optional[Loader]:
def find_loader(
item: Path, parsed_path: Optional[urllib.parse.ParseResult] = None, fallbacks: list[Loader] = [DirLoader]
) -> Optional[Loader]:
"""Finds a :class:`Loader` class for the specific ``item``.
This searches for a specific :class:`Loader` classs that is able to load a target pointed to by ``item``.
Expand All @@ -134,6 +136,7 @@ def find_loader(item: Path, parsed_path: Optional[urllib.parse.ParseResult] = No
Args:
item: The target path to load.
fallbacks: Fallback loaders to try.
Returns:
A :class:`Loader` class for the specific target if one exists.
Expand All @@ -142,12 +145,12 @@ def find_loader(item: Path, parsed_path: Optional[urllib.parse.ParseResult] = No
if loader := LOADERS_BY_SCHEME.get(parsed_path.scheme):
return loader

for loader in LOADERS + [DirLoader]:
for loader in LOADERS + fallbacks:
try:
if loader.detect(item):
return loader
except ImportError as exception:
log.warning("Failed to import %s", loader)
log.info("Failed to import %s", loader)
log.debug("", exc_info=exception)


Expand Down Expand Up @@ -177,6 +180,7 @@ def open(item: Union[str, Path], *args, **kwargs):
register("asdf", "AsdfLoader")
register("tar", "TarLoader")
register("vmx", "VmxLoader")
register("vmwarevm", "VmwarevmLoader")
register("hyperv", "HyperVLoader")
register("pvs", "PvsLoader")
register("pvm", "PvmLoader")
Expand Down
27 changes: 14 additions & 13 deletions dissect/target/loaders/asdf.py
Original file line number Diff line number Diff line change
@@ -1,33 +1,34 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

from dissect.evidence import AsdfSnapshot
from dissect.evidence.asdf.asdf import IDX_METADATA

from dissect.target.containers.asdf import AsdfContainer
from dissect.target.helpers import fsutil
from dissect.target.filesystems.tar import TarFilesystem
from dissect.target.loader import Loader
from dissect.target.loaders.tar import TarFile

if TYPE_CHECKING:
from dissect.target import Target


class AsdfLoader(Loader):
METADATA_PREFIX = "$asdf$"

def __init__(self, path, **kwargs):
def __init__(self, path: Path, **kwargs):
path = path.resolve()

super().__init__(path)
self.asdf = AsdfSnapshot(open(path, "rb"))

@staticmethod
def detect(path):
def detect(path) -> bool:
return path.suffix.lower() == ".asdf"

def map(self, target):
def map(self, target: Target) -> None:
for disk in self.asdf.disks():
target.disks.add(AsdfContainer(disk))

overlay = target.fs.add_layer()
for member in self.asdf.metadata.members():
if member.isdir():
continue

path = fsutil.join(self.METADATA_PREFIX, member.name)
entry = TarFile(overlay, path, member.name, self.asdf.metadata.tar)
overlay.map_file_entry(entry.path, entry)
target.fs.mount(self.METADATA_PREFIX, TarFilesystem(self.asdf.open(IDX_METADATA)))
4 changes: 0 additions & 4 deletions dissect/target/loaders/cb.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,6 @@ def __init__(self, path, **kwargs):
def detect(path):
return urlparse(str(path)).scheme == "cb"

@staticmethod
def find_all(path):
yield path

def map(self, target):
for drive in self.session.session_data["drives"]:
cbfs = CbFilesystem(self.cb, self.sensor, self.session, drive)
Expand Down
9 changes: 6 additions & 3 deletions dissect/target/loaders/raw.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
from pathlib import Path

from dissect.target import container
from dissect.target.loader import Loader
from dissect.target.target import Target


class RawLoader(Loader):
@staticmethod
def detect(path):
return True
def detect(path: Path) -> bool:
return not path.is_dir()

def map(self, target):
def map(self, target: Target) -> None:
target.disks.add(container.open(self.path))
1 change: 1 addition & 0 deletions dissect/target/loaders/targetd.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,3 +140,4 @@ def command_runner(link: str, targetd: Client):
targetd.easy_connect_remoting(remoting, link, caller.peers)
func = getattr(targetd.rpcs, targetd.command)
caller.output = list(func())
targetd.close()
Loading

0 comments on commit 44bb782

Please sign in to comment.