diff --git a/.gitattributes b/.gitattributes index 05bb94347..96b3847fa 100644 --- a/.gitattributes +++ b/.gitattributes @@ -5,4 +5,5 @@ tests/data/plugins/browsers/firefox/places.sqlite filter=lfs diff=lfs merge=lfs tests/data/plugins/browsers/chrome/History.sqlite filter=lfs diff=lfs merge=lfs -text tests/data/plugins/browsers/edge/History.sqlite filter=lfs diff=lfs merge=lfs -text tests/data/plugins/browsers/chromium/History.sqlite filter=lfs diff=lfs merge=lfs -text +tests/data/plugins/os/windows/dpapi/** filter=lfs diff=lfs merge=lfs -text tests/data/volumes/md-nested.bin.gz filter=lfs diff=lfs merge=lfs -text diff --git a/dissect/target/helpers/hashutil.py b/dissect/target/helpers/hashutil.py index 2a0839784..3edda0abc 100644 --- a/dissect/target/helpers/hashutil.py +++ b/dissect/target/helpers/hashutil.py @@ -1,10 +1,15 @@ +from __future__ import annotations + import hashlib +from typing import TYPE_CHECKING from flow.record import GroupedRecord, Record, RecordDescriptor from flow.record.fieldtypes import path, posix_path, windows_path from dissect.target.exceptions import FileNotFoundError, IsADirectoryError -from dissect.target.plugins.filesystem.resolver import ResolverPlugin + +if TYPE_CHECKING: + from dissect.target.target import Target BUFFER_SIZE = 32768 @@ -81,10 +86,10 @@ def hash_uri_records(target, record: Record) -> Record: return GroupedRecord(record._desc.name, [record, hashed_holder]) -def hash_uri(target, uri: str) -> Record: - """Hash the target uri.""" - if uri is None: +def hash_uri(target: Target, path: str) -> Record: + """Hash the target path.""" + if path is None: raise FileNotFoundError() - path = ResolverPlugin(target).resolve(uri) + path = target.resolve(path) return (path, target.fs.hash(path)) diff --git a/dissect/target/plugin.py b/dissect/target/plugin.py index fd9fc08b0..4ebbed88a 100644 --- a/dissect/target/plugin.py +++ b/dissect/target/plugin.py @@ -4,7 +4,6 @@ """ from __future__ import annotations -import enum import fnmatch import importlib import importlib.util @@ -23,6 +22,7 @@ from dissect.target.exceptions import PluginError, UnsupportedPluginError from dissect.target.helpers import cache from dissect.target.helpers.record import EmptyRecord +from dissect.target.helpers.utils import StrEnum try: from dissect.target.plugins._pluginlist import PLUGINS @@ -53,7 +53,7 @@ log = logging.getLogger(__name__) -class OperatingSystem(enum.Enum): +class OperatingSystem(StrEnum): LINUX = "linux" WINDOWS = "windows" ESXI = "esxi" diff --git a/dissect/target/plugins/apps/vpns/wireguard.py b/dissect/target/plugins/apps/vpns/wireguard.py index e29e122b5..b17e767c7 100644 --- a/dissect/target/plugins/apps/vpns/wireguard.py +++ b/dissect/target/plugins/apps/vpns/wireguard.py @@ -1,11 +1,12 @@ from collections import OrderedDict from configparser import ConfigParser from os.path import basename +from pathlib import Path from typing import Iterator, Union from dissect.target.exceptions import UnsupportedPluginError from dissect.target.helpers.record import TargetRecordDescriptor -from dissect.target.plugin import Plugin, export +from dissect.target.plugin import OperatingSystem, Plugin, export WireGuardInterfaceRecord = TargetRecordDescriptor( "application/vpn/wireguard/interface", @@ -63,25 +64,25 @@ class WireGuardPlugin(Plugin): - C:\\Program Files\\WireGuard\\Data\\Configurations """ - config_globs = [ + CONFIG_GLOBS = [ # Linux "/etc/wireguard/*.conf", # MacOS "/usr/local/etc/wireguard/*.conf", "/opt/homebrew/etc/wireguard/*.conf", + # Windows + "C:\\Windows\\System32\\config\\systemprofile\\AppData\\Local\\WireGuard\\Configurations\\*.dpapi", + "C:\\Program Files\\WireGuard\\Data\\Configurations\\*.dpapi", ] def __init__(self, target) -> None: super().__init__(target) - self.configs = [] - for path in self.config_globs: - cfgs = list(self.target.fs.path().glob(path.lstrip("/"))) - if len(cfgs) > 0: - for cfg in cfgs: - self.configs.append(cfg) + self.configs: list[Path] = [] + for path in self.CONFIG_GLOBS: + self.configs.extend(self.target.fs.path().glob(path.lstrip("/"))) def check_compatible(self) -> None: - if not len(self.configs): + if not self.configs: raise UnsupportedPluginError("No Wireguard configuration files found") @export(record=[WireGuardInterfaceRecord, WireGuardPeerRecord]) @@ -89,7 +90,17 @@ def config(self) -> Iterator[Union[WireGuardInterfaceRecord, WireGuardPeerRecord """Parses interface config files from wireguard installations.""" for config_path in self.configs: - config = _parse_config(config_path.read_text()) + if self.target.os == OperatingSystem.WINDOWS and config_path.suffix == ".dpapi": + try: + config_buf = self.target.dpapi.decrypt_system_blob(config_path.read_bytes()) + config_buf = config_buf.decode() + except ValueError: + self.target.log.warning("Failed to decrypt WireGuard configuration at %s", config_path) + continue + else: + config_buf = config_path.read_text() + + config = _parse_config(config_buf) for section in config.sections(): if "Interface" in section: diff --git a/dissect/target/plugins/os/windows/dpapi/__init__.py b/dissect/target/plugins/os/windows/dpapi/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/dissect/target/plugins/os/windows/dpapi/blob.py b/dissect/target/plugins/os/windows/dpapi/blob.py new file mode 100644 index 000000000..fd3e37923 --- /dev/null +++ b/dissect/target/plugins/os/windows/dpapi/blob.py @@ -0,0 +1,151 @@ +from typing import Optional +from uuid import UUID + +from dissect.cstruct import cstruct + +from dissect.target.plugins.os.windows.dpapi.crypto import ( + CipherAlgorithm, + HashAlgorithm, + crypt_session_key_type1, + crypt_session_key_type2, +) + +blob_def = """ +struct DPAPIBlob { + DWORD dwVersion; + char provider[16]; + DWORD mkVersion; + char guid[16]; + DWORD flags; + DWORD descriptionLength; + char description[descriptionLength]; + DWORD CipherAlgId; + DWORD keyLen; + DWORD saltLength; + char salt[saltLength]; + DWORD strongLength; + char strong[strongLength]; + DWORD CryptAlgId; + DWORD hashLen; + DWORD hmacLength; + char hmac[hmacLength]; + DWORD cipherTextLength; + char cipherText[cipherTextLength]; + DWORD signLength; + char sign[signLength]; +}; +""" + +c_blob = cstruct() +c_blob.load(blob_def) + + +class Blob: + """Represents a DPAPI blob.""" + + def __init__(self, data: bytes): + self._blob = c_blob.DPAPIBlob(data) + + self.version = self._blob.dwVersion + self.provider = str(UUID(bytes_le=self._blob.provider)) + self.mkversion = self._blob.mkVersion + self.guid = str(UUID(bytes_le=self._blob.guid)) + self.flags = self._blob.flags + self.description = self._blob.description.decode("utf-16-le") + self.cipher_algorithm = CipherAlgorithm.from_id(self._blob.CipherAlgId) + self.key_len = self._blob.keyLen + self.salt = self._blob.salt + self.strong = self._blob.strong + self.hash_algorithm = HashAlgorithm.from_id(self._blob.CryptAlgId) + self.hash_len = self._blob.hashLen + self.hmac = self._blob.hmac + self.cipher_text = self._blob.cipherText + + # All the blob data between the version, provider and sign fields + # TODO: Replace with future offsetof function in cstruct + self.blob = data[c_blob.DPAPIBlob.lookup["mkVersion"].offset : -(self._blob.signLength + len(c_blob.DWORD))] + self.sign = self._blob.sign + + self.clear_text = None + self.decrypted = False + self.sign_computed = None + + def decrypt( + self, + master_key: bytes, + entropy: Optional[bytes] = None, + strong_password: Optional[str] = None, + smart_card_secret: Optional[bytes] = None, + ) -> bool: + """Try to decrypt the blob with the given master key. + + Args: + master_key: Decrypted master key value. + entropy: Optional entropy for decrypting the blob. + strong_password: Optional password for decrypting the blob. + smart_card_secret: MS Next Gen Crypto secret (e.g. from PIN code). + + Returns: + True if decryption is succesful, False otherwise. + """ + if self.decrypted: + return True + + for algo in [crypt_session_key_type1, crypt_session_key_type2]: + session_key = algo( + master_key, + self.salt, + self.hash_algorithm, + entropy=entropy, + smart_card_secret=smart_card_secret, + strong_password=strong_password, + ) + key = self.cipher_algorithm.derive_key(session_key, self.hash_algorithm) + self.clear_text = self.cipher_algorithm.decrypt(self.cipher_text, key) + + padding = self.clear_text[-1] + if padding <= self.cipher_algorithm.block_length: + self.clear_text = self.clear_text[:-padding] + + # Check against provided HMAC + self.sign_computed = algo( + master_key, + self.hmac, + self.hash_algorithm, + entropy=entropy, + smart_card_secret=smart_card_secret, + verify_blob=self.blob, + ) + + self.decrypted = self.sign_computed == self.sign + if self.decrypted: + return True + + self.decrypted = False + return self.decrypted + + def __repr__(self) -> str: + s = [ + "DPAPI BLOB", + "\n".join( + ( + "\tversion = %(version)d", + "\tprovider = %(provider)s", + "\tmkey = %(guid)s", + "\tflags = %(flags)#x", + "\tdescr = %(description)s", + "\tcipher_algo = %(cipher_algorithm)r", + "\thash_algo = %(hash_algorithm)r", + ) + ) + % self.__dict__, + "\tsalt = %s" % self.salt.hex(), + "\thmac = %s" % self.hmac.hex(), + "\tcipher = %s" % self.cipher_text.hex(), + "\tsign = %s" % self.sign.hex(), + ] + if self.sign_computed is not None: + s.append("\tsign_computed = %s" % self.sign_computed.hex()) + if self.clear_text is not None: + s.append("\tcleartext = %r" % self.clear_text) + return "\n".join(s) diff --git a/dissect/target/plugins/os/windows/dpapi/crypto.py b/dissect/target/plugins/os/windows/dpapi/crypto.py new file mode 100644 index 000000000..d31721ba6 --- /dev/null +++ b/dissect/target/plugins/os/windows/dpapi/crypto.py @@ -0,0 +1,292 @@ +from __future__ import annotations + +import hashlib +import hmac +from typing import Optional, Union + +from Crypto.Cipher import AES, ARC4 + +CIPHER_ALGORITHMS: dict[Union[int, str], CipherAlgorithm] = {} +HASH_ALGORITHMS: dict[Union[int, str], HashAlgorithm] = {} + + +class CipherAlgorithm: + id: int + name: str + key_length: int + iv_length: int + block_length: int + + def __init_subclass__(cls): + CIPHER_ALGORITHMS[cls.id] = cls + CIPHER_ALGORITHMS[cls.name] = cls + + @classmethod + def from_id(cls, id: int) -> CipherAlgorithm: + return CIPHER_ALGORITHMS[id]() + + @classmethod + def from_name(cls, name: str) -> CipherAlgorithm: + return CIPHER_ALGORITHMS[name]() + + def derive_key(self, key: bytes, hash_algorithm: HashAlgorithm) -> bytes: + """Mimics the corresponding native Microsoft function.""" + if len(key) > hash_algorithm.block_length: + key = hashlib.new(hash_algorithm.name, key).digest() + + if len(key) >= hash_algorithm.digest_length: + return key + + key = key.ljust(hash_algorithm.block_length, b"\x00") + pad1 = bytes(c ^ 0x36 for c in key) + pad2 = bytes(c ^ 0x5C for c in key) + return hashlib.new(hash_algorithm.name, pad1).digest() + hashlib.new(hash_algorithm.name, pad2).digest() + + def decrypt_with_hmac( + self, data: bytes, key: bytes, iv: bytes, hash_algorithm: HashAlgorithm, rounds: int + ) -> bytes: + derived = pbkdf2(key, iv, self.key_length + self.iv_length, rounds, hash_algorithm.name) + key, iv = derived[: self.key_length], derived[self.key_length :] + + return self.decrypt(data, key, iv) + + def decrypt(self, data: bytes, key: bytes, iv: Optional[bytes] = None) -> bytes: + raise NotImplementedError() + + +class _AES(CipherAlgorithm): + id = 0x6611 + name = "AES" + key_length = 128 // 8 + iv_length = 128 // 8 + block_length = 128 // 8 + + def decrypt(self, data: bytes, key: bytes, iv: Optional[bytes] = None) -> bytes: + cipher = AES.new( + key[: self.key_length], mode=AES.MODE_CBC, IV=iv[: self.iv_length] if iv else b"\x00" * self.iv_length + ) + return cipher.decrypt(data) + + +class _AES128(_AES): + id = 0x660E + name = "AES-128" + + +class _AES192(_AES): + id = 0x660F + name = "AES-192" + key_length = 192 // 8 + + +class _AES256(_AES): + id = 0x6610 + name = "AES-256" + key_length = 256 // 8 + + +class _RC4(CipherAlgorithm): + id = 0x6801 + name = "RC4" + key_length = 40 // 8 + iv_length = 128 // 8 + block_length = 1 // 8 + + def decrypt(self, data: bytes, key: bytes, iv: Optional[bytes] = None) -> bytes: + cipher = ARC4.new(key[: self.key_length]) + return cipher.decrypt(data) + + +class HashAlgorithm: + id: int + name: str + digest_length: int + block_length: int + + def __init_subclass__(cls): + HASH_ALGORITHMS[cls.id] = cls + HASH_ALGORITHMS[cls.name] = cls + + @classmethod + def from_id(cls, id: int) -> HashAlgorithm: + return HASH_ALGORITHMS[id]() + + @classmethod + def from_name(cls, name: str) -> Optional[HashAlgorithm]: + return HASH_ALGORITHMS[name]() + + +class _MD5(HashAlgorithm): + id = 0x8003 + name = "md5" + digest_length = 128 // 8 + block_length = 512 // 8 + + +class _SHA1(HashAlgorithm): + id = 0x8004 + name = "sha1" + digest_length = 160 // 8 + block_length = 512 // 8 + + +class _HMAC(_SHA1): + """Synonymous to SHA1.""" + + id = 0x8009 + + +class _SHA256(HashAlgorithm): + id = 0x8004 + name = "sha256" + digest_length = 256 // 8 + block_length = 512 // 8 + + +class _SHA384(HashAlgorithm): + id = 0x800D + name = "sha384" + digest_length = 384 // 8 + block_length = 1024 // 8 + + +class _SHA512(HashAlgorithm): + id = 0x800E + name = "sha512" + digest_length = 512 // 8 + block_length = 1024 // 8 + + +def pbkdf2(passphrase: bytes, salt: bytes, key_len: int, iterations: int, digest: str = "sha1") -> bytes: + """Implementation of PBKDF2 that allows specifying digest algorithm. + + Returns the corresponding expanded key which is ``key_len`` long. + """ + key = bytearray() + + i = 1 + while len(key) < key_len: + U = salt + i.to_bytes(4, "big") + i += 1 + + derived = hmac.new(passphrase, U, digestmod=digest).digest() + for _ in range(iterations - 1): + actual = hmac.new(passphrase, derived, digestmod=digest).digest() + derived = bytes(x ^ y for x, y in zip(derived, actual)) + key.extend(derived) + + return bytes(key[:key_len]) + + +def dpapi_hmac(pwd_hash: bytes, hmac_salt: bytes, value: bytes, hash_algorithm: HashAlgorithm) -> bytes: + """Internal function used to compute HMACs of DPAPI structures.""" + key = hmac.new(pwd_hash, hmac_salt, digestmod=hash_algorithm.name).digest() + return hmac.new(key, value, digestmod=hash_algorithm.name).digest() + + +def crypt_session_key_type1( + master_key: bytes, + nonce: Optional[bytes], + hash_algorithm: HashAlgorithm, + entropy: Optional[bytes] = None, + strong_password: Optional[str] = None, + smart_card_secret: Optional[bytes] = None, + verify_blob: Optional[bytes] = None, +) -> bytes: + """Computes the decryption key for Type1 DPAPI blob, given the master key and optional information. + + This implementation relies on a faulty implementation from Microsoft that does not respect the HMAC RFC. + Instead of updating the inner pad, we update the outer pad. + This algorithm is also used when checking the HMAC for integrity after decryption. + + Args: + master_key: Decrypted master key (should be 64 bytes long). + nonce: This is the nonce contained in the blob or the HMAC in the blob (integrity check). + hash_algorithm: A :class:`HashAlgorithm` to use for calculating block sizes. + entropy: This is the optional entropy from ``CryptProtectData()`` API. + strong_password: Optional password used for decryption or the blob itself. + smart_card_secret: Optional MS Next Gen Crypto secret (e.g. from PIN code). + verify_blob: Optional encrypted blob used for integrity check. + Returns: + decryption key + """ + if len(master_key) > 20: + master_key = hashlib.sha1(master_key).digest() + + master_key = master_key.ljust(hash_algorithm.block_length, b"\x00") + pad1 = bytes(c ^ 0x36 for c in master_key) + pad2 = bytes(c ^ 0x5C for c in master_key) + + digest1 = hashlib.new(hash_algorithm.name) + digest1.update(pad2) + + digest2 = hashlib.new(hash_algorithm.name) + digest2.update(pad1) + digest2.update(nonce) + + if entropy and smart_card_secret: + digest2.update(entropy + smart_card_secret) + if verify_blob: + digest2.update(verify_blob) + + digest1.update(digest2.digest()) + if entropy and not smart_card_secret: + digest1.update(entropy) + + if strong_password: + strong_password = hashlib.sha1(strong_password.rstrip("\x00").encode("utf-16-le")).digest() + digest1.update(strong_password) + + if verify_blob and not smart_card_secret: + digest1.update(verify_blob) + + return digest1.digest() + + +def crypt_session_key_type2( + masterkey: bytes, + nonce: bytes, + hash_algorithm: HashAlgorithm, + entropy: Optional[bytes] = None, + strong_password: Optional[str] = None, + smart_card_secret: Optional[bytes] = None, + verify_blob: Optional[bytes] = None, +) -> bytes: + """Computes the decryption key for Type2 DPAPI blob, given the masterkey and optional information. + + This implementation relies on an RFC compliant HMAC implementation. + This algorithm is also used when checking the HMAC for integrity after decryption. + + Args: + master_key: Decrypted master key (should be 64 bytes long). + nonce: This is the nonce contained in the blob or the HMAC in the blob (integrity check). + hash_algo: A :class:`HashAlgorithm` to use for calculating block sizes. + entropy: This is the optional entropy from ``CryptProtectData()`` API. + strong_password: Optional password used for decryption or the blob itself. + smart_card_secret: Optional MS Next Gen Crypto secret (e.g. from PIN code). Only for API compatibility. + verify_blob: Optional encrypted blob used for integrity check. + Returns: + decryption key + """ + if len(masterkey) > 20: + masterkey = hashlib.sha1(masterkey).digest() + + digest = hmac.new(masterkey, digestmod=hash_algorithm.name) + digest.update(nonce) + + if entropy: + digest.update(entropy) + + if strong_password: + strong_password = hashlib.sha512(strong_password.rstrip("\x00").encode("utf-16-le")).digest() + digest.update(strong_password) + + elif verify_blob: + digest.update(verify_blob) + + return digest.digest() + + +def derive_password_hash(password_hash: bytes, user_sid: str, digest: str = "sha1") -> bytes: + """Internal use. Computes the encryption key from a user's password hash.""" + return hmac.new(password_hash, (user_sid + "\0").encode("utf-16-le"), digestmod=digest).digest() diff --git a/dissect/target/plugins/os/windows/dpapi/dpapi.py b/dissect/target/plugins/os/windows/dpapi/dpapi.py new file mode 100644 index 000000000..befaca5ad --- /dev/null +++ b/dissect/target/plugins/os/windows/dpapi/dpapi.py @@ -0,0 +1,139 @@ +import hashlib +import re +from functools import cached_property +from pathlib import Path + +from Crypto.Cipher import AES + +from dissect.target import Target +from dissect.target.exceptions import UnsupportedPluginError +from dissect.target.plugin import InternalPlugin +from dissect.target.plugins.os.windows.dpapi.blob import Blob as DPAPIBlob +from dissect.target.plugins.os.windows.dpapi.master_key import CredSystem, MasterKeyFile + + +class DPAPIPlugin(InternalPlugin): + __namespace__ = "dpapi" + + # This matches master key file names + MASTER_KEY_REGEX = re.compile("^[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}$") + + SECURITY_POLICY_KEY = "HKEY_LOCAL_MACHINE\\SECURITY\\Policy" + SYSTEM_KEY = "HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control\\LSA" + + SYSTEM_USERNAME = "System" + + def __init__(self, target: Target): + super().__init__(target) + + def check_compatible(self) -> None: + if not list(self.target.registry.keys(self.SYSTEM_KEY)): + raise UnsupportedPluginError(f"Registry key not found: {self.SYSTEM_KEY}") + + @cached_property + def syskey(self) -> bytes: + lsa = self.target.registry.key(self.SYSTEM_KEY) + syskey_keys = ["JD", "Skew1", "GBG", "Data"] + # This magic value rotates the order of the data + alterator = [0x8, 0x5, 0x4, 0x2, 0xB, 0x9, 0xD, 0x3, 0x0, 0x6, 0x1, 0xC, 0xE, 0xA, 0xF, 0x7] + + r = bytes.fromhex("".join([lsa.subkey(key).class_name for key in syskey_keys])) + return bytes(r[i] for i in alterator) + + @cached_property + def lsakey(self) -> bytes: + policy_key = "PolEKList" + + encrypted_key = self.target.registry.key(self.SECURITY_POLICY_KEY).subkey(policy_key).value("(Default)").value + + lsa_key = _decrypt_aes(encrypted_key, self.syskey) + + return lsa_key[68:100] + + @cached_property + def secrets(self) -> dict[str, bytes]: + result = {} + + reg_secrets = self.target.registry.key(self.SECURITY_POLICY_KEY).subkey("Secrets") + for subkey in reg_secrets.subkeys(): + enc_data = subkey.subkey("CurrVal").value("(Default)").value + secret = _decrypt_aes(enc_data, self.lsakey) + result[subkey.name] = secret + + return result + + @cached_property + def master_keys(self) -> dict[str, dict[str, MasterKeyFile]]: + # This assumes that there is no user named System. + # As far as I can tell, the name "System" is saved for the actual System user + # Therefore the user can't actually exist in `all_with_home` + result = {self.SYSTEM_USERNAME: {}} + + system_master_key_path = self.target.fs.path("sysvol/Windows/System32/Microsoft/Protect/S-1-5-18") + system_user_master_key_path = system_master_key_path.joinpath("User") + + for dir in [system_master_key_path, system_user_master_key_path]: + user_mks = self._load_master_keys_from_path(self.SYSTEM_USERNAME, dir) + result[self.SYSTEM_USERNAME].update(user_mks) + + for user in self.target.user_details.all_with_home(): + path = user.home_path.joinpath("AppData/Roaming/Microsoft/Protect").joinpath(user.user.sid) + user_mks = self._load_master_keys_from_path(user.user.name, path) + if user_mks: + result[user.user.name] = user_mks + + return result + + def _load_master_keys_from_path(self, username: str, path: Path) -> dict[str, MasterKeyFile]: + if not path.exists(): + return {} + + result = {} + for file in path.iterdir(): + if self.MASTER_KEY_REGEX.findall(file.name): + with file.open() as fh: + mkf = MasterKeyFile(fh) + + if username == self.SYSTEM_USERNAME: + dpapi_system = CredSystem(self.secrets["DPAPI_SYSTEM"][16:]) + + if not mkf.decrypt_with_key(dpapi_system.machine_key): + mkf.decrypt_with_key(dpapi_system.user_key) + + # This should not be possible, decrypting the System master key should always succeed + if not mkf.decrypted: + raise Exception("Failed to decrypt System master key") + + result[file.name] = mkf + + return result + + def decrypt_system_blob(self, data: bytes) -> bytes: + blob = DPAPIBlob(data) + + if not (mk := self.master_keys.get(self.SYSTEM_USERNAME, {}).get(blob.guid)): + raise ValueError("Blob UUID is unknown to system master keys") + + if not blob.decrypt(mk.key): + raise ValueError("Failed to decrypt system blob") + + return blob.clear_text + + +def _decrypt_aes(data: bytes, key: bytes) -> bytes: + ctx = hashlib.sha256() + ctx.update(key) + + tmp = data[28:60] + for _ in range(1, 1000 + 1): + ctx.update(tmp) + + aeskey = ctx.digest() + iv = b"\x00" * 16 + + result = [] + for i in range(60, len(data), 16): + cipher = AES.new(aeskey, AES.MODE_CBC, iv) + result.append(cipher.decrypt(data[i : i + 16].ljust(16, b"\x00"))) + + return b"".join(result) diff --git a/dissect/target/plugins/os/windows/dpapi/master_key.py b/dissect/target/plugins/os/windows/dpapi/master_key.py new file mode 100644 index 000000000..f909cedf3 --- /dev/null +++ b/dissect/target/plugins/os/windows/dpapi/master_key.py @@ -0,0 +1,177 @@ +import hashlib +from io import BytesIO +from typing import BinaryIO + +from dissect.cstruct import cstruct + +from dissect.target.plugins.os.windows.dpapi.crypto import ( + CipherAlgorithm, + HashAlgorithm, + derive_password_hash, + dpapi_hmac, +) + +master_key_def = """ +struct DomainKey { + DWORD dwVersion; + DWORD secretLen; + DWORD accessCheckLen; + char guid[16]; + char encryptedSecret[secretLen]; + char accessCheckLen[accessCheckLen]; +}; + +struct CredHist { + DWORD dwVersion; + char guid[16]; +}; + +struct MasterKey { + DWORD dwVersion; + char pSalt[16]; + DWORD dwPBKDF2IterationCount; + DWORD HMACAlgId; // This is actually ALG_ID + DWORD CryptAlgId; // This is actually ALG_ID + // BYTE pKey[]; +}; + +struct CredSystem { + DWORD dwRevision; + char pMachine[20]; + char pUser[20]; +}; + +struct MasterKeyFileHeader { + DWORD dwVersion; // Masterkey version. Should be 1 or 2 + DWORD dwReserved1; + DWORD dwReserved2; + WCHAR szGuid[36]; // GUID of master key. Should match filename + DWORD dwUnused1; + DWORD dwUnused2; + DWORD dwPolicy; + QWORD qwUserKeySize; + QWORD qwLocalEncKeySize; + QWORD qwLocalKeySize; + QWORD qwDomainKeySize; +}; +""" +c_master_key = cstruct() +c_master_key.load(master_key_def) + + +class MasterKey: + def __init__(self, data: bytes) -> None: + buf = BytesIO(data) + self._mk = c_master_key.MasterKey(buf) + self._mk_key = buf.read() + + self.key = None + self.key_hash = None + self.decrypted = False + + def decrypt_with_hash(self, user_sid: str, password_hash: bytes) -> bool: + """Decrypts the master key with the given user's SID and password hash.""" + return self.decrypt_with_key(derive_password_hash(password_hash, user_sid)) + + def decrypt_with_hash_10(self, user_sid: str, password_hash: bytes) -> bool: + """Decrypts the master key with the given user's hash and SID. + + Newer version of :meth:`~MasterKey.decrypt_with_hash` + """ + user_sid_encoded = user_sid.encode("utf-16-le") + pwd_hash1 = hashlib.pbkdf2_hmac("sha256", password_hash, user_sid_encoded, 10000) + pwd_hash2 = hashlib.pbkdf2_hmac("sha256", pwd_hash1, user_sid_encoded, 1)[0:16] + return self.decrypt_with_key(derive_password_hash(pwd_hash2, user_sid)) + + def decrypt_with_password(self, user_sid: str, pwd: str) -> bool: + """Decrypts the master key with the given user's password and SID.""" + for algo in ["sha1", "md4"]: + pwd_hash = hashlib.new(algo, pwd.encode("utf-16-le")).digest() + self.decrypt_with_key(derive_password_hash(pwd_hash, user_sid)) + if self.decrypted: + break + + return self.decrypted + + def decrypt_with_key(self, key: bytes) -> bool: + """Decrypts the master key with the given encryption key. + + This function also extracts the HMAC part of the decrypted data and compares it with the computed one. + + Note that once successfully decrypted, this function turns into a no-op. + """ + if self.decrypted: + return True + + if not self._mk_key: + return False + + # Compute encryption key + hash_algo = HashAlgorithm.from_id(self._mk.HMACAlgId) + cipher_algo = CipherAlgorithm.from_id(self._mk.CryptAlgId) + + data = cipher_algo.decrypt_with_hmac( + self._mk_key, + key, + self._mk.pSalt, + hash_algo, + self._mk.dwPBKDF2IterationCount, + ) + + self.key = data[-64:] + self.hmac_salt = data[:16] + self.hmac = data[16 : 16 + int(hash_algo.digest_length)] + self.hmac_computed = dpapi_hmac(key, self.hmac_salt, self.key, hash_algo) + self.decrypted = self.hmac == self.hmac_computed + if self.decrypted: + self.key_hash = hashlib.sha1(self.key).digest() + + return self.decrypted + + +class MasterKeyFile: + def __init__(self, fh: BinaryIO): + self._mk_header = c_master_key.MasterKeyFileHeader(fh) + self._user_mk = None + + # User Master Key + if self._mk_header.qwUserKeySize: + self._user_mk = MasterKey(fh.read(self._mk_header.qwUserKeySize)) + + # Here we would also parse the rest of the keys, but as of now we don't decrypt them + self._backup_mk = None + self._credhist_mk = None + self._domain_mk = None + + @property + def decrypted(self) -> bool: + return self._user_mk.decrypted + + @property + def key(self) -> bytes: + return self._user_mk.key + + def decrypt_with_hash(self, user_sid: str, password_hash: bytes) -> bool: + """See :meth:`MasterKey.decrypt_with_hash` and :meth:`MasterKey.decrypt_with_hash_10`.""" + if not self._user_mk.decrypted: + self._user_mk.decrypt_with_hash_10(user_sid, password_hash) + + if not self._user_mk.decrypted: + self._user_mk.decrypt_with_hash(user_sid, password_hash) + + return self._user_mk.decrypted + + def decrypt_with_password(self, user_sid: str, pwd: str) -> bool: + """See :meth:`MasterKey.decrypt_with_password`.""" + return self._user_mk.decrypt_with_password(user_sid, pwd) + + def decrypt_with_key(self, key: bytes) -> bool: + """See :meth:`MasterKey.decrypt_with_key`.""" + return self._user_mk.decrypt_with_key(key) + + +class CredSystem: + def __init__(self, buf: bytes): + self._struct = c_master_key.CredSystem(buf) + self.machine_key = self._struct.pMachine + self.user_key = self._struct.pUser diff --git a/dissect/target/plugins/os/windows/sam.py b/dissect/target/plugins/os/windows/sam.py index 5bf1101f7..251bc42a8 100644 --- a/dissect/target/plugins/os/windows/sam.py +++ b/dissect/target/plugins/os/windows/sam.py @@ -290,23 +290,10 @@ class SamPlugin(Plugin): """ SAM_KEY = "HKEY_LOCAL_MACHINE\\SAM\\SAM\\Domains\\Account" - SYSTEM_KEY = "HKEY_LOCAL_MACHINE\\SYSTEM\\CurrentControlSet\\Control\\LSA" def check_compatible(self) -> None: - for key in [self.SAM_KEY, self.SYSTEM_KEY]: - if not len(list(self.target.registry.keys(key))) > 0: - raise UnsupportedPluginError(f"Registry key not found: {key}") - - def retrieve_syskey(self) -> bytes: - lsa = self.target.registry.key(self.SYSTEM_KEY) - syskey_keys = ["JD", "Skew1", "GBG", "Data"] - permutation_matrix = [0x8, 0x5, 0x4, 0x2, 0xB, 0x9, 0xD, 0x3, 0x0, 0x6, 0x1, 0xC, 0xE, 0xA, 0xF, 0x7] - - r = b"" - for key in syskey_keys: - r += bytes.fromhex(lsa.subkey(key).class_name) - - return bytes(r[i] for i in permutation_matrix) + if not len(list(self.target.registry.keys(self.SAM_KEY))) > 0: + raise UnsupportedPluginError(f"Registry key not found: {self.SAM_KEY}") def calculate_samkey(self, syskey: bytes) -> bytes: aqwerty = b"!@#$%^&*()qwertyUIOPAzxcvbnmQQQQQQQQQQQQ)(*@&%\0" @@ -373,7 +360,7 @@ def sam(self) -> Iterator[SamRecord]: nt (string): Parsed NT-hash. """ - syskey = self.retrieve_syskey() # aka. bootkey + syskey = self.target.dpapi.syskey # aka. bootkey samkey = self.calculate_samkey(syskey) # aka. hashed bootkey or hbootkey almpassword = b"LMPASSWORD\0" diff --git a/tests/data/plugins/os/windows/dpapi/master_keys/d8ef5e00-328a-4919-9ab6-c058f493a6e3 b/tests/data/plugins/os/windows/dpapi/master_keys/d8ef5e00-328a-4919-9ab6-c058f493a6e3 new file mode 100644 index 000000000..862848ce0 --- /dev/null +++ b/tests/data/plugins/os/windows/dpapi/master_keys/d8ef5e00-328a-4919-9ab6-c058f493a6e3 @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:d4ce87d63e9ca92a41a6119bcdb9dd3cf2b809abfc978b8a257f3112c22c9ef7 +size 468 diff --git a/tests/data/plugins/os/windows/dpapi/test_data.dpapi b/tests/data/plugins/os/windows/dpapi/test_data.dpapi new file mode 100644 index 000000000..10269fab8 --- /dev/null +++ b/tests/data/plugins/os/windows/dpapi/test_data.dpapi @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:384eecdbf9627eb51575107b8c0cfeff9f6d7c3567134a6faf66740a146a8c8a +size 230 diff --git a/tests/test_helpers_hashutil.py b/tests/test_helpers_hashutil.py index 40b302a91..4f1d0708c 100644 --- a/tests/test_helpers_hashutil.py +++ b/tests/test_helpers_hashutil.py @@ -15,13 +15,12 @@ def create_mocked_windows_target(): return mocked_target -@patch("dissect.target.helpers.hashutil.ResolverPlugin.resolve") -def test_hash_uri(resolver): +def test_hash_uri(): """Determine hash functions""" target = create_mocked_windows_target() test_file = open(__file__, "rb") - resolver.return_value = test_file + target.resolve.return_value = test_file output = hashutil.hash_uri(target, __file__) assert output[0].name == __file__ assert len(output[1][0]) == 32 @@ -29,8 +28,7 @@ def test_hash_uri(resolver): assert len(output[1][2]) == 64 -@patch("dissect.target.helpers.hashutil.ResolverPlugin.resolve") -def test_hash_uri_none(resolver): +def test_hash_uri_none(): """Determine hash functions""" with pytest.raises(FileNotFoundError): hashutil.hash_uri(Mock(), None) diff --git a/tests/test_plugins_os_windows_dpapi.py b/tests/test_plugins_os_windows_dpapi.py new file mode 100644 index 000000000..0c4693d79 --- /dev/null +++ b/tests/test_plugins_os_windows_dpapi.py @@ -0,0 +1,43 @@ +from dissect.target.helpers.regutil import VirtualKey +from dissect.target.plugins.os.windows.dpapi.dpapi import DPAPIPlugin + +from ._utils import absolute_path + +SYSTEM_KEY_PATH = "SYSTEM\\ControlSet001\\Control\\LSA" +POLICY_KEY_PATH = "SECURITY\\Policy\\PolEKList" +DPAPI_KEY_PATH = "SECURITY\\Policy\\Secrets\\DPAPI_SYSTEM\\CurrVal" + + +def test_dpapi_decrypt_blob(target_win_users, fs_win, hive_hklm): + # Create SYSTEM keys + system_key = VirtualKey(hive_hklm, SYSTEM_KEY_PATH) + data_key = VirtualKey(hive_hklm, "Data", class_name="13f032ed") + system_key.add_subkey("Data", data_key) + gbg_key = VirtualKey(hive_hklm, "GBG", class_name="c21c0976") + system_key.add_subkey("GBG", gbg_key) + jd_key = VirtualKey(hive_hklm, "JD", class_name="ea08e1ce") + system_key.add_subkey("JD", jd_key) + skew1_key = VirtualKey(hive_hklm, "Skew1", class_name="f83ed834") + system_key.add_subkey("Skew1", skew1_key) + hive_hklm.map_key(SYSTEM_KEY_PATH, system_key) + + policy_key = VirtualKey(hive_hklm, POLICY_KEY_PATH) + policy_key_value = b"\x00\x00\x00\x01\xec\xff\xe1{*\x99t@\xaa\x93\x9a\xdb\xff&\xf1\xfc\x03\x00\x00\x00\x00\x00\x00\x00~\x12z\xb2cE.\xa5Q\x7fkD\x97\xe7\xf4\xb2\x99R\xd0\x80r\xf9/\x8b\xbb\x81\xd1\x807\xba\xa6\xe8`\xde\xe3\x1e\xa8S\x14i\xac\"$\x14\xf2$\n\xf8'_\x17I\xa9\x9b\xbb#mR\xc5\xee\x90\xeed1\xaa\xdcf\x811e j\xcdhWR\x1d a\x1e_\x01\xcb\x96\xbb\xa6\xc7t\x93p\xba>\xc5?\xb2.M\x88\x9drTX\x8f\x01H\xb3B6dZS\xc7\x9d\x99}9\x9eD\xdcJ\xd9\xfb\xc3\x92\x8c\x87W\x95\x06\x93\xcb{\xea\xff\xa62\x0f\xc8\x9c\x08\x8e/`\x15" # noqa + policy_key.add_value("(Default)", policy_key_value) + hive_hklm.map_key(POLICY_KEY_PATH, policy_key) + + secrets_key = VirtualKey(hive_hklm, DPAPI_KEY_PATH) + secrets_key_value = b"\x00\x00\x00\x01Z7%\xd6\x9e\xc9\x9e/\x15h\xb3\x13N\x8a\xc1\xfc\x03\x00\x00\x00\x00\x00\x00\x00\xc0'\xaay\xaa\x038\x82\x7f\xe4\xf0\x1b+\x0e]\x1e\x86\x9b\xfc\xeb%I/K(\xc5\xea\xd9i\\#\xff\x0c\xc4\xc1wF1\xfav\r0\x05\x19\xff-\xf2G)\xab`\xad\xb3\x02\x0bRz\xadj\x8d\xff\x9a-yE\xeb\x98iz\xc7\xa8g\xf7C\xa5\x0bJ\x04\x1e\x8b\x96\xec\xe24I\xd3\xc7_)\xea\xea]\"\xae\xe4\xe7" # noqa + secrets_key.add_value("(Default)", secrets_key_value) + hive_hklm.map_key(DPAPI_KEY_PATH, secrets_key) + + fs_win.map_file( + "Windows/System32/Microsoft/Protect/S-1-5-18/d8ef5e00-328a-4919-9ab6-c058f493a6e3", + absolute_path("data/plugins/os/windows/dpapi/master_keys/d8ef5e00-328a-4919-9ab6-c058f493a6e3"), + ) + + target_win_users.add_plugin(DPAPIPlugin) + + with open(absolute_path("data/plugins/os/windows/dpapi/test_data.dpapi"), "rb") as encrypted_blob: + decrypted = target_win_users.dpapi.decrypt_system_blob(encrypted_blob.read()) + assert decrypted == b"TestData"