Skip to content

Commit

Permalink
Add support for decrypting System DPAPI secrets (#305)
Browse files Browse the repository at this point in the history
Co-authored-by: Schamper <1254028+Schamper@users.noreply.github.com>
  • Loading branch information
cobyge and Schamper committed Sep 20, 2023
1 parent 714f541 commit 5bce890
Show file tree
Hide file tree
Showing 14 changed files with 848 additions and 38 deletions.
1 change: 1 addition & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ tests/data/plugins/browsers/firefox/places.sqlite filter=lfs diff=lfs merge=lfs
tests/data/plugins/browsers/chrome/History.sqlite filter=lfs diff=lfs merge=lfs -text
tests/data/plugins/browsers/edge/History.sqlite filter=lfs diff=lfs merge=lfs -text
tests/data/plugins/browsers/chromium/History.sqlite filter=lfs diff=lfs merge=lfs -text
tests/data/plugins/os/windows/dpapi/** filter=lfs diff=lfs merge=lfs -text
tests/data/volumes/md-nested.bin.gz filter=lfs diff=lfs merge=lfs -text
tests/_data/plugins/os/windows/catroot/* filter=lfs diff=lfs merge=lfs -text
15 changes: 10 additions & 5 deletions dissect/target/helpers/hashutil.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
from __future__ import annotations

import hashlib
from typing import TYPE_CHECKING

from flow.record import GroupedRecord, Record, RecordDescriptor
from flow.record.fieldtypes import path, posix_path, windows_path

from dissect.target.exceptions import FileNotFoundError, IsADirectoryError
from dissect.target.plugins.filesystem.resolver import ResolverPlugin

if TYPE_CHECKING:
from dissect.target.target import Target

BUFFER_SIZE = 32768

Expand Down Expand Up @@ -81,10 +86,10 @@ def hash_uri_records(target, record: Record) -> Record:
return GroupedRecord(record._desc.name, [record, hashed_holder])


def hash_uri(target, uri: str) -> Record:
"""Hash the target uri."""
if uri is None:
def hash_uri(target: Target, path: str) -> Record:
"""Hash the target path."""
if path is None:
raise FileNotFoundError()

path = ResolverPlugin(target).resolve(uri)
path = target.resolve(path)
return (path, target.fs.hash(path))
4 changes: 2 additions & 2 deletions dissect/target/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
"""
from __future__ import annotations

import enum
import fnmatch
import importlib
import importlib.util
Expand All @@ -23,6 +22,7 @@
from dissect.target.exceptions import PluginError, UnsupportedPluginError
from dissect.target.helpers import cache
from dissect.target.helpers.record import EmptyRecord
from dissect.target.helpers.utils import StrEnum

try:
from dissect.target.plugins._pluginlist import PLUGINS
Expand Down Expand Up @@ -53,7 +53,7 @@
log = logging.getLogger(__name__)


class OperatingSystem(enum.Enum):
class OperatingSystem(StrEnum):
LINUX = "linux"
WINDOWS = "windows"
ESXI = "esxi"
Expand Down
31 changes: 21 additions & 10 deletions dissect/target/plugins/apps/vpns/wireguard.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from collections import OrderedDict
from configparser import ConfigParser
from os.path import basename
from pathlib import Path
from typing import Iterator, Union

from dissect.target.exceptions import UnsupportedPluginError
from dissect.target.helpers.record import TargetRecordDescriptor
from dissect.target.plugin import Plugin, export
from dissect.target.plugin import OperatingSystem, Plugin, export

WireGuardInterfaceRecord = TargetRecordDescriptor(
"application/vpn/wireguard/interface",
Expand Down Expand Up @@ -63,33 +64,43 @@ class WireGuardPlugin(Plugin):
- C:\\Program Files\\WireGuard\\Data\\Configurations
"""

config_globs = [
CONFIG_GLOBS = [
# Linux
"/etc/wireguard/*.conf",
# MacOS
"/usr/local/etc/wireguard/*.conf",
"/opt/homebrew/etc/wireguard/*.conf",
# Windows
"C:\\Windows\\System32\\config\\systemprofile\\AppData\\Local\\WireGuard\\Configurations\\*.dpapi",
"C:\\Program Files\\WireGuard\\Data\\Configurations\\*.dpapi",
]

def __init__(self, target) -> None:
super().__init__(target)
self.configs = []
for path in self.config_globs:
cfgs = list(self.target.fs.path().glob(path.lstrip("/")))
if len(cfgs) > 0:
for cfg in cfgs:
self.configs.append(cfg)
self.configs: list[Path] = []
for path in self.CONFIG_GLOBS:
self.configs.extend(self.target.fs.path().glob(path.lstrip("/")))

def check_compatible(self) -> None:
if not len(self.configs):
if not self.configs:
raise UnsupportedPluginError("No Wireguard configuration files found")

@export(record=[WireGuardInterfaceRecord, WireGuardPeerRecord])
def config(self) -> Iterator[Union[WireGuardInterfaceRecord, WireGuardPeerRecord]]:
"""Parses interface config files from wireguard installations."""

for config_path in self.configs:
config = _parse_config(config_path.read_text())
if self.target.os == OperatingSystem.WINDOWS and config_path.suffix == ".dpapi":
try:
config_buf = self.target.dpapi.decrypt_system_blob(config_path.read_bytes())
config_buf = config_buf.decode()
except ValueError:
self.target.log.warning("Failed to decrypt WireGuard configuration at %s", config_path)
continue
else:
config_buf = config_path.read_text()

config = _parse_config(config_buf)

for section in config.sections():
if "Interface" in section:
Expand Down
Empty file.
151 changes: 151 additions & 0 deletions dissect/target/plugins/os/windows/dpapi/blob.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
from typing import Optional
from uuid import UUID

from dissect.cstruct import cstruct

from dissect.target.plugins.os.windows.dpapi.crypto import (
CipherAlgorithm,
HashAlgorithm,
crypt_session_key_type1,
crypt_session_key_type2,
)

blob_def = """
struct DPAPIBlob {
DWORD dwVersion;
char provider[16];
DWORD mkVersion;
char guid[16];
DWORD flags;
DWORD descriptionLength;
char description[descriptionLength];
DWORD CipherAlgId;
DWORD keyLen;
DWORD saltLength;
char salt[saltLength];
DWORD strongLength;
char strong[strongLength];
DWORD CryptAlgId;
DWORD hashLen;
DWORD hmacLength;
char hmac[hmacLength];
DWORD cipherTextLength;
char cipherText[cipherTextLength];
DWORD signLength;
char sign[signLength];
};
"""

c_blob = cstruct()
c_blob.load(blob_def)


class Blob:
"""Represents a DPAPI blob."""

def __init__(self, data: bytes):
self._blob = c_blob.DPAPIBlob(data)

self.version = self._blob.dwVersion
self.provider = str(UUID(bytes_le=self._blob.provider))
self.mkversion = self._blob.mkVersion
self.guid = str(UUID(bytes_le=self._blob.guid))
self.flags = self._blob.flags
self.description = self._blob.description.decode("utf-16-le")
self.cipher_algorithm = CipherAlgorithm.from_id(self._blob.CipherAlgId)
self.key_len = self._blob.keyLen
self.salt = self._blob.salt
self.strong = self._blob.strong
self.hash_algorithm = HashAlgorithm.from_id(self._blob.CryptAlgId)
self.hash_len = self._blob.hashLen
self.hmac = self._blob.hmac
self.cipher_text = self._blob.cipherText

# All the blob data between the version, provider and sign fields
# TODO: Replace with future offsetof function in cstruct
self.blob = data[c_blob.DPAPIBlob.lookup["mkVersion"].offset : -(self._blob.signLength + len(c_blob.DWORD))]
self.sign = self._blob.sign

self.clear_text = None
self.decrypted = False
self.sign_computed = None

def decrypt(
self,
master_key: bytes,
entropy: Optional[bytes] = None,
strong_password: Optional[str] = None,
smart_card_secret: Optional[bytes] = None,
) -> bool:
"""Try to decrypt the blob with the given master key.
Args:
master_key: Decrypted master key value.
entropy: Optional entropy for decrypting the blob.
strong_password: Optional password for decrypting the blob.
smart_card_secret: MS Next Gen Crypto secret (e.g. from PIN code).
Returns:
True if decryption is succesful, False otherwise.
"""
if self.decrypted:
return True

for algo in [crypt_session_key_type1, crypt_session_key_type2]:
session_key = algo(
master_key,
self.salt,
self.hash_algorithm,
entropy=entropy,
smart_card_secret=smart_card_secret,
strong_password=strong_password,
)
key = self.cipher_algorithm.derive_key(session_key, self.hash_algorithm)
self.clear_text = self.cipher_algorithm.decrypt(self.cipher_text, key)

padding = self.clear_text[-1]
if padding <= self.cipher_algorithm.block_length:
self.clear_text = self.clear_text[:-padding]

# Check against provided HMAC
self.sign_computed = algo(
master_key,
self.hmac,
self.hash_algorithm,
entropy=entropy,
smart_card_secret=smart_card_secret,
verify_blob=self.blob,
)

self.decrypted = self.sign_computed == self.sign
if self.decrypted:
return True

self.decrypted = False
return self.decrypted

def __repr__(self) -> str:
s = [
"DPAPI BLOB",
"\n".join(
(
"\tversion = %(version)d",
"\tprovider = %(provider)s",
"\tmkey = %(guid)s",
"\tflags = %(flags)#x",
"\tdescr = %(description)s",
"\tcipher_algo = %(cipher_algorithm)r",
"\thash_algo = %(hash_algorithm)r",
)
)
% self.__dict__,
"\tsalt = %s" % self.salt.hex(),
"\thmac = %s" % self.hmac.hex(),
"\tcipher = %s" % self.cipher_text.hex(),
"\tsign = %s" % self.sign.hex(),
]
if self.sign_computed is not None:
s.append("\tsign_computed = %s" % self.sign_computed.hex())
if self.clear_text is not None:
s.append("\tcleartext = %r" % self.clear_text)
return "\n".join(s)
Loading

0 comments on commit 5bce890

Please sign in to comment.