Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Parallels PVM/PVS/HDD/HDS #220

Merged
merged 5 commits into from
May 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion dissect/target/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def open(item: Union[list, str, BinaryIO, Path], *args, **kwargs):
first_fh = first
else:
first_path = first
if first_path.exists():
if first_path.is_file():
first_fh = first.open("rb")
first_fh_opened = True

Expand Down Expand Up @@ -213,4 +213,6 @@ def open(item: Union[list, str, BinaryIO, Path], *args, **kwargs):
register("vhd", "VhdContainer")
register("qcow2", "QCow2Container")
register("vdi", "VdiContainer")
register("hdd", "HddContainer")
register("hds", "HdsContainer")
register("split", "SplitContainer")
37 changes: 37 additions & 0 deletions dissect/target/containers/hdd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import io
from pathlib import Path
from typing import BinaryIO, Union

from dissect.hypervisor import hdd

from dissect.target.container import Container


class HddContainer(Container):
def __init__(self, fh: Path, *args, **kwargs):
if hasattr(fh, "read"):
raise TypeError("HddContainer can only be opened by path")

self.hdd = hdd.HDD(fh)
self.stream = self.hdd.open()
super().__init__(fh, self.stream.size, *args, **kwargs)

@staticmethod
def detect_fh(fh: BinaryIO, original: Union[list, BinaryIO]) -> bool:
return False

@staticmethod
def detect_path(path: Path, original: Union[list, BinaryIO]) -> bool:
return path.suffix.lower() == ".hdd"

def read(self, length: int) -> bytes:
Schamper marked this conversation as resolved.
Show resolved Hide resolved
return self.stream.read(length)

def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
return self.stream.seek(offset, whence)

def tell(self) -> int:
return self.stream.tell()

def close(self) -> None:
self.stream.close()
41 changes: 41 additions & 0 deletions dissect/target/containers/hds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import io
from pathlib import Path
from typing import BinaryIO, Union

from dissect.hypervisor.disk import hdd
from dissect.hypervisor.disk.c_hdd import c_hdd

from dissect.target.container import Container


class HdsContainer(Container):
def __init__(self, fh: Union[BinaryIO, Path], *args, **kwargs):
f = fh
if not hasattr(fh, "read"):
f = fh.open("rb")

self.hds = hdd.HDS(f)
super().__init__(fh, self.hds.size, *args, **kwargs)

@staticmethod
def detect_fh(fh: BinaryIO, original: Union[list, BinaryIO]) -> bool:
sig = fh.read(16)
fh.seek(-16, io.SEEK_CUR)

return sig in (c_hdd.SIGNATURE_STRUCTURED_DISK_V1, c_hdd.SIGNATURE_STRUCTURED_DISK_V2)

@staticmethod
def detect_path(path: Path, original: Union[list, BinaryIO]) -> bool:
return path.suffix.lower() == ".hds"

def read(self, length: int) -> bytes:
return self.hds.read(length)

def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
return self.hds.seek(offset, whence)

def tell(self) -> int:
return self.hds.tell()

def close(self) -> None:
self.hds.close()
2 changes: 2 additions & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,8 @@ def open(item: Union[str, Path], *args, **kwargs):
register("tar", "TarLoader")
register("vmx", "VmxLoader")
register("hyperv", "HyperVLoader")
register("pvs", "PvsLoader")
register("pvm", "PvmLoader")
register("ovf", "OvfLoader")
register("vbox", "VBoxLoader")
register("ewf", "EwfLoader")
Expand Down
14 changes: 14 additions & 0 deletions dissect/target/loaders/pvm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path

from dissect.target.loaders.pvs import PvsLoader


class PvmLoader(PvsLoader):
"""Parallels VM directory (.pvm)."""

def __init__(self, path: Path, **kwargs):
super().__init__(path.joinpath("config.pvs"))

@staticmethod
def detect(path: Path) -> bool:
return path.is_dir() and path.suffix.lower() == ".pvm" and path.joinpath("config.pvs").exists()
35 changes: 35 additions & 0 deletions dissect/target/loaders/pvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

from dissect.hypervisor import pvs

from dissect.target.containers.hdd import HddContainer
from dissect.target.loader import Loader

if TYPE_CHECKING:
from dissect.target import Target


class PvsLoader(Loader):
"""Parallels VM configuration file (config.pvs)."""

def __init__(self, path: Path, **kwargs):
path = path.resolve()

super().__init__(path)
self.pvs = pvs.PVS(path.open("rt"))
self.base_dir = path.parent

@staticmethod
def detect(path: Path) -> bool:
return path.suffix.lower() == ".pvs"

def map(self, target: Target) -> None:
for disk in self.pvs.disks():
path = self.base_dir.joinpath(disk)
try:
target.disks.add(HddContainer(path))
except Exception:
target.log.exception("Failed to load HDD: %s", disk)
22 changes: 22 additions & 0 deletions tests/test_loaders_pvm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from unittest.mock import call, patch

from dissect.target.loaders.pvm import PvmLoader

from ._utils import mkdirs


@patch("dissect.target.loaders.pvs.HddContainer")
@patch("dissect.target.loaders.pvs.pvs.PVS")
def test_pvm_loader(PVS, HddContainer, mock_target, tmp_path):
mkdirs(tmp_path, ["Test.pvm"])
(tmp_path / "Test.pvm" / "config.pvs").touch()

PVS.return_value = PVS
PVS.disks.return_value = ["mock.hdd"]
HddContainer.return_value = HddContainer

pvm_loader = PvmLoader(tmp_path / "Test.pvm")
pvm_loader.map(mock_target)

assert len(mock_target.disks) == 1
assert HddContainer.mock_calls == [call(tmp_path.resolve() / "Test.pvm" / "mock.hdd")]
20 changes: 20 additions & 0 deletions tests/test_loaders_pvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from unittest.mock import call, patch

from dissect.target.loaders.pvs import PvsLoader


@patch("pathlib.Path")
@patch("dissect.target.loaders.pvs.HddContainer")
@patch("dissect.target.loaders.pvs.pvs.PVS")
def test_pvs_loader(PVS, HddContainer, Path, mock_target):
PVS.return_value = PVS
PVS.disks.return_value = ["mock.hdd"]
HddContainer.return_value = HddContainer

pvs_loader = PvsLoader(Path("/mock.pvs"))
pvs_loader.map(mock_target)

expected = [call.disks()]
del PVS.mock_calls[0]
assert expected == PVS.mock_calls
assert len(mock_target.disks) == 1