From 7bd676205a30b9308a79d3d068bb3a2fa8fa6a00 Mon Sep 17 00:00:00 2001 From: Charlie Bromberg <40902872+ShutdownRepo@users.noreply.github.com> Date: Tue, 19 Mar 2024 19:48:33 +0100 Subject: [PATCH] Dcshadowdev (#45) * added testing stuff * Revert "added testing stuff" This reverts commit 1b102e5d6d133daa88a09149583672f5e3f085cd. * added testing stuff * add printing in gssapi unwrap * modified calls to AES-CTS encrypt function * additional printing * better printing using hexdump * test.py now has a working example. Take encrypted DsBindResponse from dcshadow wireshark decrypt it, and produces again same encrypted stuff with same signature by taking the padding bytes and the confounder from the encrypted message * added some logs * updating gssapi and crypto for aes-cts wrap, unwrap, encryp, decrypt * last changes for tonight * some updates * ncchanges baby! * reverting change on getMIC * reverting change on kerberosv5.py * reverting change on kerberosv5.py * reverting change on kerberosv5.py * reverting change on kerberosv5.py --------- Co-authored-by: MrAle98 --- impacket/krb5/crypto.py | 34 ++++++++++++++------ impacket/krb5/gssapi.py | 71 +++++++++++++++++++++++++++++++---------- 2 files changed, 79 insertions(+), 26 deletions(-) diff --git a/impacket/krb5/crypto.py b/impacket/krb5/crypto.py index 673554bf9..885387c5d 100644 --- a/impacket/krb5/crypto.py +++ b/impacket/krb5/crypto.py @@ -39,6 +39,7 @@ from binascii import unhexlify from functools import reduce from os import urandom + # XXX current status: # * Done and tested # - AES encryption, checksum, string2key, prf @@ -60,7 +61,6 @@ from Cryptodome.Util.number import GCD as gcd from six import b, PY3, indexbytes, binary_type - def get_random_bytes(lenBytes): # We don't really need super strong randomness here to use PyCrypto.Random return urandom(lenBytes) @@ -210,17 +210,23 @@ def derive(cls, key, constant): return cls.random_to_key(rndseed[0:cls.seedsize]) @classmethod - def encrypt(cls, key, keyusage, plaintext, confounder): + def encrypt(cls, key, keyusage, plaintext, confounder, integrity_blob=None): ki = cls.derive(key, pack('>IB', keyusage, 0x55)) ke = cls.derive(key, pack('>IB', keyusage, 0xAA)) if confounder is None: confounder = get_random_bytes(cls.blocksize) basic_plaintext = confounder + _zeropad(plaintext, cls.padsize) - hmac = HMAC.new(ki.contents, basic_plaintext, cls.hashmod).digest() - return cls.basic_encrypt(ke, basic_plaintext) + hmac[:cls.macsize] + # For Kerberos SSP: blob for HMAC calculation is different from blob for encryption. + # This is undocumented, and was identified by reversing cryptodll.dll + if integrity_blob is not None: + hmac = HMAC.new(ki.contents, integrity_blob, cls.hashmod).digest() + else: + hmac = HMAC.new(ki.contents, basic_plaintext, cls.hashmod).digest() + enc = cls.basic_encrypt(ke, basic_plaintext) + hmac[:cls.macsize] + return enc @classmethod - def decrypt(cls, key, keyusage, ciphertext): + def decrypt(cls, key, keyusage, ciphertext, ignore_integrity=False, wrap_token_len=None, filler_len=None, dce_rpc_header=None, auth_data_header=None): ki = cls.derive(key, pack('>IB', keyusage, 0x55)) ke = cls.derive(key, pack('>IB', keyusage, 0xAA)) if len(ciphertext) < cls.blocksize + cls.macsize: @@ -229,12 +235,22 @@ def decrypt(cls, key, keyusage, ciphertext): if len(basic_ctext) % cls.padsize != 0: raise ValueError('ciphertext does not meet padding requirement') basic_plaintext = cls.basic_decrypt(ke, bytes(basic_ctext)) - hmac = bytearray(HMAC.new(ki.contents, basic_plaintext, cls.hashmod).digest()) + plaintext, confounder = bytes(basic_plaintext[cls.blocksize:]), bytes(basic_plaintext[:cls.blocksize]) + # For Kerberos SSP: blob for HMAC calculation is different from blob for encryption. + # This is undocumented, and was identified by reversing cryptodll.dll + # in that case, integrity_blob to send to HMAC + # = confounder + dce_rpc_header + data + auth_data_header + filler + wrap_token + # no need to implement, assuming the other client is legit + if dce_rpc_header is not None and auth_data_header is not None: + plaintext_data, plaintext_filler_and_wrap_token_header = plaintext[:-filler_len-wrap_token_len], plaintext[-filler_len-wrap_token_len:] # FIXME dynamically set the filler size + integrity_blob = confounder + dce_rpc_header + plaintext_data + auth_data_header + plaintext_filler_and_wrap_token_header + hmac = bytearray(HMAC.new(ki.contents, integrity_blob, SHA).digest()) + else: + hmac = bytearray(HMAC.new(ki.contents, basic_plaintext, SHA).digest()) expmac = hmac[:cls.macsize] - if not _mac_equal(mac, expmac): + if not _mac_equal(mac, expmac) and not ignore_integrity: raise InvalidChecksum('ciphertext integrity failure') - # Discard the confounder. - return bytes(basic_plaintext[cls.blocksize:]) + return bytes(basic_plaintext[cls.blocksize:]), bytes(basic_plaintext[:cls.blocksize]) @classmethod def prf(cls, key, string): diff --git a/impacket/krb5/gssapi.py b/impacket/krb5/gssapi.py index f30060acf..73d7e229b 100644 --- a/impacket/krb5/gssapi.py +++ b/impacket/krb5/gssapi.py @@ -18,6 +18,7 @@ import struct import random import string + from six import b from Cryptodome.Hash import HMAC, MD5 @@ -123,7 +124,7 @@ def GSS_GetMIC(self, sessionKey, data, sequenceNumber, direction = 'init'): finalData = GSS_GETMIC_HEADER + token.getData() return finalData - def GSS_Wrap(self, sessionKey, data, sequenceNumber, direction = 'init', encrypt=True, authData=None): + def GSS_Wrap(self, sessionKey, data, sequenceNumber, direction='init', encrypt=True, authData=None): # Damn inacurate RFC, useful info from here # https://social.msdn.microsoft.com/Forums/en-US/fb98e8f4-e697-4652-bcb7-604e027e14cc/gsswrap-token-size-kerberos-and-rc4hmac?forum=os_windowsprotocols # and here @@ -246,27 +247,50 @@ def unrotate(self, data, numBytes): numBytes %= len(data) result = data[numBytes:] + data[:numBytes] return result + + def get_filler(self, data): + cipher = self.cipherType() + length = (cipher.blocksize - (len(data) % cipher.blocksize)) or 16 + filler = b'\x00' * length + return filler + - def GSS_Wrap(self, sessionKey, data, sequenceNumber, direction = 'init', encrypt=True): + def GSS_Wrap(self, sessionKey, data, sequenceNumber, direction='init', encrypt=True, acceptorSubkey=False, keyUsage=KG_USAGE_INITIATOR_SEAL, confounder=None, dce_rpc_header=None, auth_data_header=None): token = self.WRAP() cipher = self.cipherType() - # Let's pad the data - pad = (cipher.blocksize - (len(data) % cipher.blocksize)) & 15 - padStr = b'\xFF' * pad - data += padStr + if confounder is None: + confounder = b(''.join([rand.choice(string.ascii_letters) for _ in range(16)])) - # The RRC field ([RFC4121] section 4.2.5) is 12 if no encryption is requested or 28 if encryption + # The RRC field ([RFC4121] section 4.2.5) is 12 if no encryption is requested or 28 if encryption # is requested. The RRC field is chosen such that all the data can be encrypted in place. rrc = 28 - token['Flags'] = 6 - token['EC'] = pad + token['Flags'] = 0 + if acceptorSubkey: + token['Flags'] |= 4 # AcceptorSubKey flag (0x00100) + if encrypt: + token['Flags'] |= 2 # Sealed flag (0x00010) + if direction != 'init': + token['Flags'] |= 1 # SendByAcceptor flag (0x00001) + + token['EC'] = len(self.get_filler(data)) + # (RFC4121 section 4.2.4) the RRC field (as + # defined in section 4.2.5) in the to-be-encrypted header contains the + # hex value 00 00. token['RRC'] = 0 - token['SND_SEQ'] = struct.pack('>Q',sequenceNumber) + token['SND_SEQ'] = struct.pack('>Q', sequenceNumber) + + # For Kerberos SSP: blob for HMAC calculation is different from blob for encryption. + # This is undocumented, and was identified by reversing cryptodll.dll + integrity_blob = None + if dce_rpc_header is not None and auth_data_header is not None: + integrity_blob = confounder + dce_rpc_header + data + auth_data_header + self.get_filler(data) + token.getData() - cipherText = cipher.encrypt(sessionKey, KG_USAGE_INITIATOR_SEAL, data + token.getData(), None) + data += self.get_filler(data) + + cipherText = cipher.encrypt(key=sessionKey, keyusage=keyUsage, plaintext=data + token.getData(), confounder=confounder, integrity_blob=integrity_blob) token['RRC'] = rrc cipherText = self.rotate(cipherText, token['RRC'] + token['EC']) @@ -277,18 +301,31 @@ def GSS_Wrap(self, sessionKey, data, sequenceNumber, direction = 'init', encrypt return ret1, ret2 - def GSS_Unwrap(self, sessionKey, data, sequenceNumber, direction = 'init', encrypt=True, authData=None): + def GSS_Unwrap(self, sessionKey, data, sequenceNumber, direction='init', encrypt=True, authData=None, keyUsage=KG_USAGE_ACCEPTOR_SEAL, dce_rpc_header=None, auth_data_header=None, ignore_integrity=False): + # implementation as per [MS-KILE] Section 4.3 from impacket.dcerpc.v5.rpcrt import SEC_TRAILER cipher = self.cipherType() token = self.WRAP(authData[len(SEC_TRAILER()):]) - rotated = authData[len(self.WRAP())+len(SEC_TRAILER()):] + data - - cipherText = self.unrotate(rotated, token['RRC'] + token['EC']) - plainText = cipher.decrypt(sessionKey, KG_USAGE_ACCEPTOR_SEAL, cipherText) + enc1 = authData[len(self.WRAP()) + len(SEC_TRAILER()):] + enc2 = data + + cipherText = self.unrotate(enc1 + enc2, token['RRC'] + token['EC']) + + # For Kerberos SSP: blob for HMAC calculation is different from blob for encryption. + # This is undocumented, and was identified by reversing cryptodll.dll + # no need to implement, assuming the other client is legit + plainText, confounder = cipher.decrypt( + sessionKey, keyUsage, cipherText, + dce_rpc_header=dce_rpc_header, auth_data_header=auth_data_header, ignore_integrity=ignore_integrity, + wrap_token_len=len(self.WRAP()), filler_len=token['EC'] + ) + + encdata, enchdr = plainText[:-len(self.WRAP())], plainText[-len(self.WRAP()):] + encdata, filler = encdata[:-token['EC']], encdata[-token['EC']:] - return plainText[:-(token['EC']+len(self.WRAP()))], None + return encdata, confounder class GSSAPI_AES256(GSSAPI_AES): checkSumProfile = crypto._SHA1AES256