forked from SkyLined/mBugId
-
Notifications
You must be signed in to change notification settings - Fork 0
/
cCorruptionDetector.py
69 lines (64 loc) · 3.56 KB
/
cCorruptionDetector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import hashlib;
from dxBugIdConfig import dxBugIdConfig;
from fsGetNumberDescription import fsGetNumberDescription;
class cCorruptionDetector(object):
# Can be used to check for memory corruption
def __init__(oCorruptionDetector, oCdbWrapper):
oCorruptionDetector.oCdbWrapper = oCdbWrapper;
oCorruptionDetector.dsCorruptedBytesHex_by_uAddress = {};
oCorruptionDetector.bCorruptionDetected = False;
oCorruptionDetector.uCorruptionStartAddress = None; # first corrupted byte as detected by fDetectCorruption
oCorruptionDetector.uCorruptionSize = None;
oCorruptionDetector.uCorruptionEndAddress = None; # first non-corrupted byte after corruption
def fbDetectCorruption(oCorruptionDetector, uStartAddress, axExpectedBytes):
aauExpectedBytes = [isinstance(xExpectedBytes, list) and xExpectedBytes or [xExpectedBytes] for xExpectedBytes in axExpectedBytes];
auBytes = oCorruptionDetector.oCdbWrapper.fauGetBytes(uStartAddress, len(axExpectedBytes));
for uOffset in xrange(len(axExpectedBytes)):
uAddress = uStartAddress + uOffset;
auExpectedBytes = aauExpectedBytes[uOffset];
uByte = auBytes[uOffset];
if uByte not in auExpectedBytes:
oCorruptionDetector.dsCorruptedBytesHex_by_uAddress[uAddress] = uByte is None and "??" or "%02X" % uByte;
if not oCorruptionDetector.bCorruptionDetected:
oCorruptionDetector.bCorruptionDetected = True;
oCorruptionDetector.uCorruptionStartAddress = uAddress;
oCorruptionDetector.uCorruptionEndAddress = uAddress + 1;
elif uAddress < oCorruptionDetector.uCorruptionStartAddress:
oCorruptionDetector.uCorruptionStartAddress = uAddress;
elif uAddress >= oCorruptionDetector.uCorruptionEndAddress:
oCorruptionDetector.uCorruptionEndAddress = uAddress + 1;
oCorruptionDetector.uCorruptionSize = \
oCorruptionDetector.uCorruptionEndAddress - oCorruptionDetector.uCorruptionStartAddress;
return oCorruptionDetector.bCorruptionDetected;
def fatxMemoryRemarks(oCorruptionDetector):
# Coalese corrupted bytes into blocks where possible, then create remarks for each such block or single byte.
atxMemoryRemarks = [];
uStartAddress = None;
uLength = None;
for uAddress in sorted(oCorruptionDetector.dsCorruptedBytesHex_by_uAddress.keys()):
if uStartAddress is None or uAddress != uStartAddress + uLength:
if uStartAddress is not None:
atxMemoryRemarks.append(("Corrupted", uStartAddress, uLength));
uStartAddress = uAddress;
uLength = 1;
else:
uLength += 1;
if uStartAddress is not None:
atxMemoryRemarks.append(("Corrupted", uStartAddress, uLength));
return atxMemoryRemarks;
def fasCorruptedBytes(oCorruptionDetector):
# xrange can't handle longs, so we have to work around that:
uStartAddress = oCorruptionDetector.uCorruptionStartAddress;
uLength = oCorruptionDetector.uCorruptionEndAddress - uStartAddress;
return [
oCorruptionDetector.dsCorruptedBytesHex_by_uAddress.get(uStartAddress + uOffset, "??")
for uOffset in xrange(uLength)
];
def fsCorruptionId(oCorruptionDetector):
uCorruptionLength = oCorruptionDetector.uCorruptionEndAddress - oCorruptionDetector.uCorruptionStartAddress;
sId = "~%s" % fsGetNumberDescription(uCorruptionLength);
if dxBugIdConfig["uHeapCorruptedBytesHashChars"]:
oHasher = hashlib.md5();
oHasher.update("".join(oCorruptionDetector.fasCorruptedBytes()));
sId += "#%s" % oHasher.hexdigest()[:dxBugIdConfig["uHeapCorruptedBytesHashChars"]];
return sId;