Skip to content

Commit

Permalink
Add support for Parallels PVM/PVS/HDD/HDS (#220)
Browse files Browse the repository at this point in the history
  • Loading branch information
Schamper committed May 2, 2023
1 parent 99abc8d commit e87d4e6
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 1 deletion.
4 changes: 3 additions & 1 deletion dissect/target/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ def open(item: Union[list, str, BinaryIO, Path], *args, **kwargs):
first_fh = first
else:
first_path = first
if first_path.exists():
if first_path.is_file():
first_fh = first.open("rb")
first_fh_opened = True

Expand Down Expand Up @@ -213,4 +213,6 @@ def open(item: Union[list, str, BinaryIO, Path], *args, **kwargs):
register("vhd", "VhdContainer")
register("qcow2", "QCow2Container")
register("vdi", "VdiContainer")
register("hdd", "HddContainer")
register("hds", "HdsContainer")
register("split", "SplitContainer")
37 changes: 37 additions & 0 deletions dissect/target/containers/hdd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
import io
from pathlib import Path
from typing import BinaryIO, Union

from dissect.hypervisor import hdd

from dissect.target.container import Container


class HddContainer(Container):
def __init__(self, fh: Path, *args, **kwargs):
if hasattr(fh, "read"):
raise TypeError("HddContainer can only be opened by path")

self.hdd = hdd.HDD(fh)
self.stream = self.hdd.open()
super().__init__(fh, self.stream.size, *args, **kwargs)

@staticmethod
def detect_fh(fh: BinaryIO, original: Union[list, BinaryIO]) -> bool:
return False

@staticmethod
def detect_path(path: Path, original: Union[list, BinaryIO]) -> bool:
return path.suffix.lower() == ".hdd"

def read(self, length: int) -> bytes:
return self.stream.read(length)

def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
return self.stream.seek(offset, whence)

def tell(self) -> int:
return self.stream.tell()

def close(self) -> None:
self.stream.close()
41 changes: 41 additions & 0 deletions dissect/target/containers/hds.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
import io
from pathlib import Path
from typing import BinaryIO, Union

from dissect.hypervisor.disk import hdd
from dissect.hypervisor.disk.c_hdd import c_hdd

from dissect.target.container import Container


class HdsContainer(Container):
def __init__(self, fh: Union[BinaryIO, Path], *args, **kwargs):
f = fh
if not hasattr(fh, "read"):
f = fh.open("rb")

self.hds = hdd.HDS(f)
super().__init__(fh, self.hds.size, *args, **kwargs)

@staticmethod
def detect_fh(fh: BinaryIO, original: Union[list, BinaryIO]) -> bool:
sig = fh.read(16)
fh.seek(-16, io.SEEK_CUR)

return sig in (c_hdd.SIGNATURE_STRUCTURED_DISK_V1, c_hdd.SIGNATURE_STRUCTURED_DISK_V2)

@staticmethod
def detect_path(path: Path, original: Union[list, BinaryIO]) -> bool:
return path.suffix.lower() == ".hds"

def read(self, length: int) -> bytes:
return self.hds.read(length)

def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
return self.hds.seek(offset, whence)

def tell(self) -> int:
return self.hds.tell()

def close(self) -> None:
self.hds.close()
2 changes: 2 additions & 0 deletions dissect/target/loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,8 @@ def open(item: Union[str, Path], *args, **kwargs):
register("vmx", "VmxLoader")
register("vmwarevm", "VmwarevmLoader")
register("hyperv", "HyperVLoader")
register("pvs", "PvsLoader")
register("pvm", "PvmLoader")
register("ovf", "OvfLoader")
register("vbox", "VBoxLoader")
register("ewf", "EwfLoader")
Expand Down
14 changes: 14 additions & 0 deletions dissect/target/loaders/pvm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from pathlib import Path

from dissect.target.loaders.pvs import PvsLoader


class PvmLoader(PvsLoader):
"""Parallels VM directory (.pvm)."""

def __init__(self, path: Path, **kwargs):
super().__init__(path.joinpath("config.pvs"))

@staticmethod
def detect(path: Path) -> bool:
return path.is_dir() and path.suffix.lower() == ".pvm" and path.joinpath("config.pvs").exists()
35 changes: 35 additions & 0 deletions dissect/target/loaders/pvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from __future__ import annotations

from pathlib import Path
from typing import TYPE_CHECKING

from dissect.hypervisor import pvs

from dissect.target.containers.hdd import HddContainer
from dissect.target.loader import Loader

if TYPE_CHECKING:
from dissect.target import Target


class PvsLoader(Loader):
"""Parallels VM configuration file (config.pvs)."""

def __init__(self, path: Path, **kwargs):
path = path.resolve()

super().__init__(path)
self.pvs = pvs.PVS(path.open("rt"))
self.base_dir = path.parent

@staticmethod
def detect(path: Path) -> bool:
return path.suffix.lower() == ".pvs"

def map(self, target: Target) -> None:
for disk in self.pvs.disks():
path = self.base_dir.joinpath(disk)
try:
target.disks.add(HddContainer(path))
except Exception:
target.log.exception("Failed to load HDD: %s", disk)
22 changes: 22 additions & 0 deletions tests/test_loaders_pvm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from unittest.mock import call, patch

from dissect.target.loaders.pvm import PvmLoader

from ._utils import mkdirs


@patch("dissect.target.loaders.pvs.HddContainer")
@patch("dissect.target.loaders.pvs.pvs.PVS")
def test_pvm_loader(PVS, HddContainer, mock_target, tmp_path):
mkdirs(tmp_path, ["Test.pvm"])
(tmp_path / "Test.pvm" / "config.pvs").touch()

PVS.return_value = PVS
PVS.disks.return_value = ["mock.hdd"]
HddContainer.return_value = HddContainer

pvm_loader = PvmLoader(tmp_path / "Test.pvm")
pvm_loader.map(mock_target)

assert len(mock_target.disks) == 1
assert HddContainer.mock_calls == [call(tmp_path.resolve() / "Test.pvm" / "mock.hdd")]
20 changes: 20 additions & 0 deletions tests/test_loaders_pvs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
from unittest.mock import call, patch

from dissect.target.loaders.pvs import PvsLoader


@patch("pathlib.Path")
@patch("dissect.target.loaders.pvs.HddContainer")
@patch("dissect.target.loaders.pvs.pvs.PVS")
def test_pvs_loader(PVS, HddContainer, Path, mock_target):
PVS.return_value = PVS
PVS.disks.return_value = ["mock.hdd"]
HddContainer.return_value = HddContainer

pvs_loader = PvsLoader(Path("/mock.pvs"))
pvs_loader.map(mock_target)

expected = [call.disks()]
del PVS.mock_calls[0]
assert expected == PVS.mock_calls
assert len(mock_target.disks) == 1

0 comments on commit e87d4e6

Please sign in to comment.