Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update VelociraptorLoader based on version 0.7.0 #358

Merged
merged 14 commits into from
Aug 18, 2023
Merged
16 changes: 12 additions & 4 deletions dissect/target/helpers/loaderutil.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import logging
import re
import urllib
from os import PathLike
from pathlib import Path
from typing import Optional, Tuple, Union
from typing import BinaryIO, Optional, Union

from dissect.target.exceptions import FileNotFoundError
from dissect.target.filesystem import Filesystem
from dissect.target.filesystems.ntfs import NtfsFilesystem

log = logging.getLogger(__name__)


def add_virtual_ntfs_filesystem(
target,
Expand Down Expand Up @@ -38,17 +42,21 @@
fs.ntfs = ntfs.ntfs


def _try_open(fs, path):
def _try_open(fs: Filesystem, path: str) -> BinaryIO:
paths = [path] if not isinstance(path, list) else path

for path in paths:
try:
return fs.open(path)
path = fs.get(path)
if path.stat().st_size > 0:
return path.open()
else:
log.warning("File is empty and will be skipped: %s", path)

Check warning on line 54 in dissect/target/helpers/loaderutil.py

View check run for this annotation

Codecov / codecov/patch

dissect/target/helpers/loaderutil.py#L54

Added line #L54 was not covered by tests
except FileNotFoundError:
pass


def extract_path_info(path: Union[str, Path]) -> Tuple[Path, Optional[urllib.parse.ParseResult]]:
def extract_path_info(path: Union[str, Path]) -> tuple[Path, Optional[urllib.parse.ParseResult]]:
"""
Extracts a ParseResult from a path if it has
a scheme and adjusts the path if necessary.
Expand Down
42 changes: 26 additions & 16 deletions dissect/target/loaders/velociraptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,27 +10,36 @@
from dissect.target import Target

FILESYSTEMS_ROOT = "uploads"
UNIX_ACCESSORS = ["file", "auto"]
WINDOWS_ACCESSORS = ["mft", "ntfs", "lazy_ntfs", "ntfs_vss"]


def find_fs_directories(path: Path) -> tuple[Optional[OperatingSystem], Optional[list[Path]]]:
# As of Velociraptor version 0.6.7 the structure of the Velociraptor Offline Collector varies by operating system
# Generic.Collectors.File (Linux and OS-X) root filesystem is 'uploads/file/'
# Generic.Collectors.File (Windows) and Windows.KapeFiles.Targets (Windows) root filesystem is
# 'uploads/<file-accessor>/<drive-name>/'
# As of Velociraptor version 0.7.0 the structure of the Velociraptor Offline Collector varies by operating system.
# Generic.Collectors.File (Unix) uses the accessors file and auto.
# Generic.Collectors.File (Windows) and Windows.KapeFiles.Targets (Windows) uses the accessors
# mft, ntfs, lazy_ntfs and ntfs_vss.

fs_root = path.joinpath(FILESYSTEMS_ROOT)

# Linux and OS-X
file_root = fs_root.joinpath("file")
if file_root.exists():
os_type, dirs = find_dirs(file_root)
if os_type in [OperatingSystem.LINUX, OperatingSystem.OSX]:
return os_type, [dirs[0]]
# Unix
for accessor in UNIX_ACCESSORS:
accessor_root = fs_root.joinpath(accessor)
if accessor_root.exists():
os_type, dirs = find_dirs(accessor_root)
if os_type in [OperatingSystem.UNIX, OperatingSystem.LINUX, OperatingSystem.OSX]:
return os_type, [dirs[0]]

# Windows
volumes = set()
for accessor in WINDOWS_ACCESSORS:
accessor_root = fs_root.joinpath(accessor)
if accessor_root.exists():
# If the accessor directory exists, assume all the subdirectories are volumes
volumes.update(accessor_root.iterdir())

# This suppports usage of the ntfs accessor 'uploads/mft/%5C%5C.%5CC%3A' not the accessors lazy_ntfs or auto
mft_root = fs_root.joinpath("mft")
if mft_root.exists():
# If the `mft` directory exists, assume all the subdirectories are volumes
return OperatingSystem.WINDOWS, list(mft_root.iterdir())
if volumes:
return OperatingSystem.WINDOWS, list(volumes)

return None, None

Expand Down Expand Up @@ -62,12 +71,13 @@ def detect(path: Path) -> bool:
def map(self, target: Target) -> None:
os_type, dirs = find_fs_directories(self.path)
if os_type == OperatingSystem.WINDOWS:
# Velociraptor doesn't have the correct filenames for several files, like $J
# Velociraptor doesn't have the correct filenames for the paths "$J" and "$Secure:$SDS"
map_dirs(
target,
dirs,
os_type,
usnjrnl_path="$Extend/$UsnJrnl%3A$J",
sds_path="$Secure%3A$SDS",
)
else:
map_dirs(target, dirs, os_type)
99 changes: 68 additions & 31 deletions tests/test_loaders_velociraptor.py
Original file line number Diff line number Diff line change
@@ -1,58 +1,95 @@
from pathlib import Path

import pytest

from dissect.target import Target
from dissect.target.loaders.velociraptor import VelociraptorLoader

from ._utils import absolute_path, mkdirs


def test_velociraptor_loader_windows_ntfs(mock_target, tmp_path):
@pytest.mark.parametrize(
"paths",
[
(
[
"uploads.json",
"uploads/mft/%5C%5C.%5CC%3A/",
"uploads/mft/%5C%5C.%5CC%3A/$Extend",
"uploads/mft/%5C%5C.%5CC%3A/windows/system32",
"uploads/mft/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1",
]
),
(
[
"uploads.json",
"uploads/ntfs/%5C%5C.%5CC%3A/",
"uploads/ntfs/%5C%5C.%5CC%3A/$Extend",
"uploads/ntfs/%5C%5C.%5CC%3A/windows/system32",
"uploads/ntfs/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1",
]
),
(
[
"uploads.json",
"uploads/ntfs_vss/%5C%5C.%5CC%3A/",
"uploads/ntfs_vss/%5C%5C.%5CC%3A/$Extend",
"uploads/ntfs_vss/%5C%5C.%5CC%3A/windows/system32",
"uploads/ntfs_vss/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1",
]
),
(
[
"uploads.json",
"uploads/lazy_ntfs/%5C%5C.%5CC%3A/",
"uploads/lazy_ntfs/%5C%5C.%5CC%3A/$Extend",
"uploads/lazy_ntfs/%5C%5C.%5CC%3A/windows/system32",
"uploads/lazy_ntfs/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1",
]
),
],
)
def test_velociraptor_loader_windows_ntfs(paths: list[str], mock_target: Target, tmp_path: Path) -> None:
root = tmp_path
mkdirs(
root,
[
"uploads.json",
"uploads/mft/%5C%5C.%5CC%3A/$Extend",
"uploads/mft/%5C%5C.%5CC%3A/windows/system32",
"uploads/mft/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy1",
"uploads/mft/%5C%5C%3F%5CGLOBALROOT%5CDevice%5CHarddiskVolumeShadowCopy2",
],
)
mkdirs(root, paths)

with open(absolute_path("data/mft.raw"), "rb") as fh:
(root / "uploads/mft/%5C%5C.%5CC%3A/$MFT").write_bytes(fh.read(10 * 1025))
root.joinpath(paths[1]).joinpath("$MFT").write_bytes(fh.read(10 * 1025))

# Add one record so we can test if it works
data = bytes.fromhex(
"5800000002000000c100000000000100bf000000000001002003010000000000"
"6252641a86a4d7010381008000000000000000002000000018003c0069007300"
"2d00310035005000320036002e0074006d00700000000000"
)
(root / "uploads/mft/%5C%5C.%5CC%3A/$Extend/$UsnJrnl%3A$J").write_bytes(data)
root.joinpath(paths[2]).joinpath("$UsnJrnl%3A$J").write_bytes(data)

assert VelociraptorLoader.detect(root) is True

loader = VelociraptorLoader(root)
loader.map(mock_target)

# TODO: Add fake Secure:SDS and verify mft function
assert len(list(mock_target.usnjrnl())) == 1

# The 3 found directories + the fake NTFS filesystem
assert len(mock_target.filesystems) == 4


def test_dir_loader_linux(mock_target, tmp_path):
root = tmp_path
mkdirs(root, ["uploads.json", "uploads/file/etc", "uploads/file/var"])

assert VelociraptorLoader.detect(root) is True

loader = VelociraptorLoader(root)
loader.map(mock_target)

assert len(mock_target.filesystems) == 1


def test_dir_loader_macos(mock_target, tmp_path):
# The 2 found directories + the fake NTFS filesystem
assert len(mock_target.filesystems) == 3


@pytest.mark.parametrize(
"paths",
[
(["uploads.json", "uploads/file/etc", "uploads/file/var"]),
(["uploads.json", "uploads/auto/etc", "uploads/auto/var"]),
(["uploads.json", "uploads/file/etc", "uploads/file/var", "uploads/file/opt"]),
(["uploads.json", "uploads/auto/etc", "uploads/auto/var", "uploads/auto/opt"]),
(["uploads.json", "uploads/file/Library", "uploads/file/Applications"]),
(["uploads.json", "uploads/auto/Library", "uploads/auto/Applications"]),
],
)
def test_dir_loader_unix(paths: list[str], mock_target: Target, tmp_path: Path) -> None:
root = tmp_path
mkdirs(root, ["uploads.json", "uploads/file/Library"])
mkdirs(root, paths)

assert VelociraptorLoader.detect(root) is True

Expand Down