In [1]:
import triton
import keystone
import SupertracePybind as Supertrace
from enum import Enum
from collections import defaultdict
import graphviz
from miasm.arch.x86.arch import mn_x86
from miasm.expression.expression import ExprMem, ExprId, ExprInt, ExprOp
from supertrace_util import initTritonCtxEnv, mergeRepeatIns, checkIndirectIns

In [2]:
ks = keystone.Ks(keystone.KS_ARCH_X86, keystone.KS_MODE_64)
def asmDecode(CODE: bytes, addr = 0) -> bytes:
    try:
        encoding, count = ks.asm(CODE, addr, True)
        if (count <= 0):
            raise keystone.KsError
        return bytes(encoding)
    except keystone.KsError as e:
        print("ERROR: %s" %e)
        return b""

In [3]:
tracepath = "1.trace64"

trace = Supertrace.parse_x64dbg_trace(tracepath)
record = trace.getRecord()
print(f"trace instruction num: {len(record)}")

trace instruction num: 110684


In [4]:
ctx = triton.TritonContext()
ctx.setArchitecture(triton.ARCH.X86_64)
ctx.setMode(triton.MODE.ALIGNED_MEMORY, True)
ctx.setMode(triton.MODE.AST_OPTIMIZATIONS, True)
ctx.setMode(triton.MODE.CONSTANT_FOLDING, True)

def newLiftDot(expr_node) -> str:
    temp_ctx = triton.TritonContext()
    temp_ctx.setArchitecture(ctx.getArchitecture())
    return temp_ctx.liftToDot(expr_node)

def initTritonCtxEnvSym(
    ctx: triton.TritonContext,
    init_refer_ins: Supertrace.InstructionRecord,
    teb_addr: int, symbolizeDF: bool = False, symbolizeTF: bool = False, symbolizeIF: bool = False):

    initTritonCtxEnv(ctx, init_refer_ins, teb_addr) # 初始化具体化环境
    
    regs = ctx.registers
    if (ctx.getArchitecture() == triton.ARCH.X86):
        init_regdump = init_refer_ins.reg_dump32
    elif ctx.getArchitecture() == triton.ARCH.X86_64:
        init_regdump = init_refer_ins.reg_dump64
    else:
        print("unsurport arch")

    # 符号化所有寄存器
    for ttreg in ctx.getAllRegisters():
        ctx.symbolizeRegister(ttreg, ttreg.getName())

    # 平坦模型
    ctx.setConcreteRegisterValue(ctx.registers.ss, 0)
    ctx.setConcreteRegisterValue(ctx.registers.ds, 0)
    ctx.setConcreteRegisterValue(ctx.registers.es, 0)
    ctx.setConcreteRegisterValue(ctx.registers.cs, 0)
    
    if ctx.getArchitecture() == triton.ARCH.X86:
        ctx.setConcreteRegisterValue(ctx.registers.esp, init_regdump.regcontext.csp)
        ctx.setConcreteRegisterValue(ctx.registers.eip, init_regdump.regcontext.cip)
        ctx.setConcreteRegisterValue(regs.fs, teb_addr)
        ctx.setConcreteRegisterValue(regs.gs, 0) # 平坦模型
    elif ctx.getArchitecture() == triton.ARCH.X86_64:
        ctx.setConcreteRegisterValue(ctx.registers.rsp, init_regdump.regcontext.csp)
        ctx.setConcreteRegisterValue(ctx.registers.rip, init_regdump.regcontext.cip)
        ctx.setConcreteRegisterValue(regs.fs, 0) # 平坦模型
        ctx.setConcreteRegisterValue(regs.gs, teb_addr)

    if (symbolizeDF):
        ctx.setConcreteRegisterValue(ctx.registers.df, init_regdump.flags.d)
    if (symbolizeTF):
        ctx.setConcreteRegisterValue(ctx.registers.tf, init_regdump.flags.t)
    if (symbolizeIF):
        ctx.setConcreteRegisterValue(getattr(ctx.registers, "if"), init_regdump.flags.i)

    ctx.setConcreteRegisterValue(ctx.registers.nt, 0)
    ctx.setConcreteRegisterValue(ctx.registers.rf, 0)
    ctx.setConcreteRegisterValue(ctx.registers.id, 0)
    ctx.setConcreteRegisterValue(ctx.registers.vip, 0)
    ctx.setConcreteRegisterValue(ctx.registers.vif, 0)
    ctx.setConcreteRegisterValue(ctx.registers.ac, 0)
    ctx.setConcreteRegisterValue(ctx.registers.vm, 0)

In [5]:
record = mergeRepeatIns(ctx, record, True)
print(f"after mergeing 'rep' instructions, trace instruction num: {len(record)}")

after mergeing 'rep' instructions, trace instruction num: 110684


In [6]:
threads = trace.user.meta.getThreads()
for th in threads:
    if th.id == record[0].thread_id:
        main_thread = th
        break
print(f"main thread id: {main_thread.id} ({hex(main_thread.id)})")
print(f"teb: {hex(main_thread.teb)}")
initTritonCtxEnv(ctx, record[0], main_thread.teb)

main thread id: 33076 (0x8134)
teb: 0xccd10d8000


---

In [7]:
class ST_BasicBlock():
    def __init__(self, instructions: list[Supertrace.InstructionRecord]):
        self.instructions: list[Supertrace.InstructionRecord] = instructions
        
        # 针对基本块最后一条指令而言
        self.isControlflow: bool        = None
        self.isBranch: bool             = None

        self.isUnconditionalJump: bool  = None
        self.isConditionalJump: bool    = None
        self.isCall: bool               = None
        self.isRet: bool                = None

        self.isDirect: bool             = None

        self.targetAddr: int            = None
        self.fallThroughAddr: int       = None

        # (indirectTargetAddrs 不是正式的控制流意义字段，因为其数据来源于跟踪记录而非指令本身，因此按照原则，writeBBControlIns 函数不会处理它)
        self.indirectTargetAddrs: set[int]        = set()

    def __hash__(self):
        h: int = 0
        for ins in self.instructions:
            h1 = hash(ins.bytes)
            h2 = hash(ins.ins_address)
            h += (h1 + h2)
        h += hash(len(self.instructions))
        return h

    def __eq__(self, other):
        if not isinstance(other, ST_BasicBlock):
            return NotImplemented
        if (len(self.instructions) != len(other.instructions)):
            return False
        for i, ins in enumerate(self.instructions):
            if (ins.bytes != other.instructions[i].bytes):
                return False
            if (ins.ins_address != other.instructions[i].ins_address):
                return False
        return True
    
    def __repr__(self):
        return f"<{hex(self.getCurrentAddr())}> insCount: {len(self.instructions)}"
    
    def printbb(self):
        '''打印基本块里的所有指令'''
        for ins in self.instructions:
            print(f"{ins}\t{ins.ttins}")
    
    def getFirstIns(self) -> Supertrace.InstructionRecord:
        '''获取基本块的第一条指令'''
        return self.instructions[0]
    
    def getLastIns(self) -> Supertrace.InstructionRecord:
        '''获取基本块的最后一条指令'''
        return self.instructions[-1]
    
    def getCurrentAddr(self) -> int:
        '''获取当前基本块的地址'''
        return self.getFirstIns().ins_address
    
    def isReachMe(self, jumpAddr: int) -> bool:
        '''判断跳转地址是否能进入此基本块'''
        return jumpAddr == self.getFirstIns().ins_address
    
    def findSubIns(self, checkIns: Supertrace.InstructionRecord) -> int:
        '''如果包含，返回对应的索引位置，否则返回 -1'''
        for i, ins in enumerate(self.instructions):
            if (ins == checkIns):
                return i
        return -1
    
    def isContainsAddr(self, checkAddr: int) -> int:
        '''如果包含，返回对应的索引位置，否则返回 -1'''
        for i, ins in enumerate(self.instructions):
            if (ins.ins_address == checkAddr):
                return i
        return -1
    
    def findSubBasicBlock(self, otherBB) -> int:
        '''如果包含，返回对应在开头处的索引位置，否则返回 -1'''
        otherBB: ST_BasicBlock = otherBB
        
        if (self == otherBB): # 走上面的 __eq__ 方法
            return 0
        if (len(self.instructions) < len(otherBB.instructions)):
            return -1
        startIdx = self.findSubIns(otherBB.instructions[0])
        if (startIdx == -1):
            return -1
        
        i = startIdx
        for otherBBIns in otherBB.instructions:
            if (self.instructions[i] != otherBBIns):
                return -1
            i += 1

        return startIdx

def writeBBControlIns(bb: ST_BasicBlock, ttins: triton.Instruction):
    bb.isControlflow        = False
    bb.isBranch             = False

    bb.isUnconditionalJump  = False
    bb.isConditionalJump    = False
    bb.isCall               = False
    bb.isRet                = False

    bb.isDirect             = False

    bb.targetAddr           = 0
    bb.fallThroughAddr      = 0
    # ---------
    bb.isControlflow = ttins.isControlFlow()
    if (bb.isControlflow):
        ttinsType = ttins.getType()

        bb.isCall = (ttinsType == triton.OPCODE.X86.CALL)
        bb.isRet = (ttinsType == triton.OPCODE.X86.RET)
        bb.isBranch = ttins.isBranch()
        if (bb.isBranch):
            bb.isUnconditionalJump = (ttinsType == triton.OPCODE.X86.JMP)
            bb.isConditionalJump = (not bb.isUnconditionalJump)
        if (bb.isCall or bb.isBranch): # ret指令绝对不会是直接跳转
            ops = ttins.getOperands()
            bb.isDirect = (len(ops) == 1 and ops[0].getType() == triton.OPERAND.IMM)
            if (bb.isDirect):
                immop: triton.Immediate = ops[0]
                if (ttins.getAddress() == 0):
                    print("[警告] writeBBControlIns(): ttins.getAddress() 为 0")
                bb.targetAddr = immop.getValue()
    bb.fallThroughAddr = ttins.getNextAddress()

In [8]:
bbs: set[ST_BasicBlock] = set()

def FindBB(bbs: set[ST_BasicBlock], bbaddr: int) -> ST_BasicBlock:
    for bb in bbs:
        if (bb.isReachMe(bbaddr)):
            return bb
    return None

In [9]:
bbinsList: list[Supertrace.InstructionRecord] = []
bbQuickDict = {} # 缓存，用于加快基本块查询速度
for i, ins in enumerate(record):

    ttins = triton.Instruction()
    ttins.setOpcode(ins.bytes)
    ttins.setAddress(ins.ins_address)
    ctx.disassembly(ttins)
    ins.ttins = ttins

    bbinsList.append(ins)
    if (ttins.isControlFlow()):
        bb = ST_BasicBlock(bbinsList.copy())
        if (bb in bbs):
            find = bbQuickDict[bb.getCurrentAddr()] # 缓存查询。下面不用额外判断find是否正确，上面已经判断好了
            if (checkIndirectIns(ttins) and (i + 1 < len(record))): # 单独填充 indirectTargetAddrs 的信息
                find.indirectTargetAddrs.add(record[i + 1].ins_address)
        else:
            writeBBControlIns(bb, ttins)
            if (checkIndirectIns(ttins) and (i + 1 < len(record))): # 单独填充 indirectTargetAddrs 的信息
                bb.indirectTargetAddrs.add(record[i + 1].ins_address)
            
            bbs.add(bb)
            bbQuickDict[bb.getCurrentAddr()] = bb # 缓存更新

        bbinsList.clear()

In [10]:
print(f"从trace采集到的基本块数量: {len(bbs)}")

从trace采集到的基本块数量: 3577


拆分独立基本块（预处理）

In [11]:
def split_basic_blocks(bbs: set[ST_BasicBlock]) -> tuple[set[ST_BasicBlock], bool]:
    """
    拆分所有存在“包含关系”的 BasicBlock
    保证最终结果中不存在一个 BB 严格包含另一个 BB
    """
    #TODO: 暂时没处理 bb.indirectTargetAddrs
    hasChange = False
    bbs = set(bbs)

    changed = True
    while changed:
        changed = False

        # 按长度排序，只允许 small 切 big
        sorted_bbs = sorted(bbs, key=lambda bb: len(bb.instructions))

        to_remove = set()
        to_add = set()

        for small in sorted_bbs:
            for big in sorted_bbs:
                if small is big:
                    continue

                if len(small.instructions) >= len(big.instructions):
                    continue

                subIdx = big.findSubBasicBlock(small)
                if subIdx == -1:
                    continue

                # ---- 开始拆分 ----
                changed = True
                hasChange = True
                to_remove.add(big)

                instrs = big.instructions

                # 前导基本块
                if subIdx > 0:
                    left = ST_BasicBlock(instrs[:subIdx])
                    writeBBControlIns(left, left.getLastIns().ttins)
                    to_add.add(left)

                # 后导基本块
                endIdx = subIdx + len(small.instructions)
                if endIdx < len(instrs):
                    right = ST_BasicBlock(instrs[endIdx:])
                    writeBBControlIns(right, right.getLastIns().ttins)
                    to_add.add(right)

                break

            if changed:
                break

        if changed:
            bbs.difference_update(to_remove)
            bbs.update(to_add)

    return bbs, hasChange

In [12]:
class BBLinkType(Enum):
    INDIRECT_UNCOND = 0            # 无条件间接跳转
    DIRECT_UNCOND = 1              # 无条件直接跳转
    DIRECT_COND_TAKEN = 2          # 有条件直接跳转(跳转已发生)
    DIRECT_COND_NOTTAKEN = 3       # 有条件直接跳转(跳转未发生)
    DIRECT_FALLTHROUGH = 4         # 顺序接续跳转(用于无控制流指令的基本块)

    OBF_JUMP = 5                   # 混淆跳转

class BBLink():
    def __init__(self, linkType: BBLinkType, linkAddress):
        self.type = linkType
        self.addr = linkAddress
    def __hash__(self):
        h: int = 0
        h += hash(self.type.name)
        h += hash(hex(self.addr))
        return h
    def __eq__(self, other):
        if not isinstance(other, BBLink):
            return NotImplemented
        if (other.type != self.type): return False
        if (other.addr != self.addr): return False
        return True
    def __repr__(self):
        return f"type: {self.type}\taddr: {hex(self.addr)}"

合并基本块（jmp imm 和 call imm、无控制流指令基本块以及被证实为某些混淆控制流的基本块）

In [13]:
def merge_basic_block(bbs: set[ST_BasicBlock]) -> tuple[set[ST_BasicBlock], bool]:
    hasChange = False
    bbs = set(bbs)
    
    def mergeBasicBlock(a: ST_BasicBlock, b: ST_BasicBlock) -> ST_BasicBlock:
        '''将基本块b追加到a的后面，新的基本块继承b的属性'''
        newbb = ST_BasicBlock(a.instructions + b.instructions)
        writeBBControlIns(newbb, b.getLastIns().ttins)
        newbb.indirectTargetAddrs = b.indirectTargetAddrs
        return newbb

    def getPrec(precAddr: int, checkAddr: int) -> BBLink:
        for prec in precursors[precAddr]:
            if (prec.addr == checkAddr):
                return prec
        return None

    precursors = defaultdict(set[BBLink]) # 前驱
    successors = defaultdict(set[BBLink]) # 后继

    for bb in bbs:
        current = bb.getCurrentAddr()
        if (bb.isControlflow):
            if (bb.isDirect):

                if (bb.isConditionalJump): # 针对有条件跳转
                    if not (hasattr(bb, "obfOPStatus") and bb.obfOPStatus == True): # ObfOP 反混淆
                        taken1 = BBLink(BBLinkType.DIRECT_COND_TAKEN, bb.targetAddr)
                        taken2 = BBLink(BBLinkType.DIRECT_COND_NOTTAKEN, bb.fallThroughAddr)
                        target = set({taken1, taken2})
                    else:
                        if (bb.obfOPCondTaken):
                            taken = BBLink(BBLinkType.DIRECT_UNCOND, bb.targetAddr)
                        else:
                            taken = BBLink(BBLinkType.DIRECT_FALLTHROUGH, bb.fallThroughAddr)
                        target = set({taken})
                else: # 针对无条件跳转 到这里的基本块的跳转指令是 jmp imm 或 call imm
                    taken = BBLink(BBLinkType.DIRECT_UNCOND, bb.targetAddr)
                    target = set({taken})

            else: # 间接跳转
                if not (hasattr(bb, "obfIndirectStatus") and bb.obfIndirectStatus == True): # ObfIndirect 反混淆
                    target = set()
                    for indirectTargetAddr in bb.indirectTargetAddrs:
                        taken = BBLink(BBLinkType.INDIRECT_UNCOND, indirectTargetAddr)
                        target.add(taken)
                else:
                    taken = BBLink(BBLinkType.DIRECT_UNCOND, bb.obfIndirectAddr)
                    target = set({taken})
        else:
            # 到这里的基本块是没有控制流指令的
            taken = BBLink(BBLinkType.DIRECT_FALLTHROUGH, bb.fallThroughAddr)
            target = set({taken})
        
        successors[current] = successors[current].union(target)
        for taken in target:
            precursors[taken.addr].add(BBLink(taken.type, current))

    goNext = True
    while(goNext):
        goNext = False

        for bb in bbs:
            current = bb.getCurrentAddr()

            for succ in successors[current]:

                target = succ.addr
                
                succCheck = (len(successors[current]) == 1) and \
                    (succ.type == BBLinkType.DIRECT_UNCOND or succ.type == BBLinkType.DIRECT_FALLTHROUGH)
                
                precCheck = (target in precursors) and \
                    (len(precursors[target]) == 1) and \
                    (getPrec(target, current) != None) and \
                    (getPrec(target, current).type == BBLinkType.DIRECT_UNCOND or getPrec(target, current).type == BBLinkType.DIRECT_FALLTHROUGH)

                if (succCheck and precCheck):

                    targetBB = FindBB(bbs, target)
                    if (targetBB == None):
                        print(f"异常! target: {hex(target)} 不存在 bb: {bb}")
                        continue

                    mergebb = mergeBasicBlock(bb, targetBB)

                    mergeAddr = mergebb.getCurrentAddr()
                    assert(mergeAddr == current)

                    successors[mergeAddr] = successors[target].copy()
                    successors[target].clear()
                    
                    precursors[target].clear()
                    for newSucc in successors[mergeAddr]:
                        removePrec = getPrec(newSucc.addr, target)
                        assert(removePrec != None)
                        precursors[newSucc.addr].remove(removePrec)

                        precursors[newSucc.addr].add(BBLink(newSucc.type, mergeAddr))

                    bbs.remove(targetBB)
                    bbs.remove(bb)
                    bbs.add(mergebb)
                    
                    goNext = True
                    hasChange = True
                    break
            if (goNext):
                break

    return bbs, hasChange

消除不透明谓词 (ObfOP)

In [14]:
def checkOpaquePredicate(bb: ST_BasicBlock):
    insList = bb.instructions

    tmpctx = triton.TritonContext()
    tmpctx.setArchitecture(ctx.getArchitecture())
    tmpctx.setMode(triton.MODE.ALIGNED_MEMORY, True)
    tmpctx.setMode(triton.MODE.AST_OPTIMIZATIONS, True)
    tmpctx.setMode(triton.MODE.CONSTANT_FOLDING, True)
    tmpctx.setMode(triton.MODE.SYMBOLIZE_INDEX_ROTATION, True)
    tmpctx.setMode(triton.MODE.PC_TRACKING_SYMBOLIC, False)
    # 不建议开启符号内存，这会导致求解时间显著增长
    tmpastctx = tmpctx.getAstContext()

    # 符号化寄存器环境
    initTritonCtxEnvSym(tmpctx, bb.getFirstIns(), main_thread.teb)

    for i, ins in enumerate(insList):
        ttins = triton.Instruction()
        ttins.setAddress(ins.ins_address)
        ttins.setOpcode(ins.bytes)

        for memAcc in ins.mem_accs:
            if (memAcc.type != Supertrace.AccessType.READ):
                continue
            for i in range(memAcc.acc_size):
                memi = triton.MemoryAccess(memAcc.acc_address + i, triton.CPUSIZE.BYTE)
                if (not tmpctx.isConcreteMemoryValueDefined(memi)):
                    oldby = (memAcc.old_data >> (i * 8)) & 0xFF
                    tmpctx.setConcreteMemoryValue(memi, oldby)
                    tmpctx.symbolizeMemory(memi, hex(memAcc.acc_address + i))

        tmpctx.processing(ttins)

        if ((i == len(insList) - 1) and ttins.isBranch() and ttins.getType() != triton.OPCODE.X86.JMP): # 筛选JCC有条件跳转指令
            if (hasattr(ins, "obfOPStatus") and ins.obfOPStatus == True): # 已经证明过了，跳过此次证明过程
                continue
            ins.obfOPStatus = False
            ins.obfOPCondTaken = False

            pathPredicate = tmpctx.getPathPredicate()

            satCount = 0
            if (pathPredicate.isSymbolized()):
                if (tmpctx.isSat(pathPredicate)):
                    satCount += 1
                if (tmpctx.isSat(tmpastctx.lnot(pathPredicate))):
                    satCount += 1
            else:
                satCount = 1

            if (satCount == 1):
                # 是不透明谓词
                ins.obfOPStatus = True
                ins.obfOPCondTaken = (ttins.getNextAddress() != record[ins.id + 1].ins_address)
                # print(f"{hex(ins.ins_address)} 是不透明谓词！跳转情况: {ins.obfOPCondTaken} {ttins}")

def prove_opaque_predicate(bbs: set[ST_BasicBlock]) -> tuple[set[ST_BasicBlock], bool]:
    hasChange = False
    bbs = set(bbs)

    for bb in bbs: # 在目前的实现中，只关注基本块的最后一条指令是JCC的情况
        if (bb.isConditionalJump):
            lastIns = bb.getLastIns()
            if (hasattr(lastIns, "obfOPStatus") and lastIns.obfOPStatus == True):
                continue # 已证明过，跳过
            checkOpaquePredicate(bb)
            if (hasattr(lastIns, "obfOPStatus") and lastIns.obfOPStatus == True):
                # print(f"{hex(bb.getCurrentAddr())} 存在OP: {lastIns.ttins}")
                # 将指令的不透明谓词的信息保存到基本块上
                bb.obfOPStatus = lastIns.obfOPStatus
                bb.obfOPCondTaken = lastIns.obfOPCondTaken
                hasChange = True

    return bbs, hasChange

间接跳转混淆消除（ObfIndirect）

In [15]:
def checkObfIndirect(bb: ST_BasicBlock):
    insList = bb.instructions

    tmpctx = triton.TritonContext()
    tmpctx.setArchitecture(ctx.getArchitecture())
    tmpctx.setMode(triton.MODE.ALIGNED_MEMORY, True)
    tmpctx.setMode(triton.MODE.AST_OPTIMIZATIONS, True)
    tmpctx.setMode(triton.MODE.CONSTANT_FOLDING, True)
    tmpctx.setMode(triton.MODE.SYMBOLIZE_INDEX_ROTATION, True)
    # 不要开启符号内存，会丢失正确识别!

    # 符号化寄存器环境
    initTritonCtxEnvSym(tmpctx, bb.getFirstIns(), main_thread.teb)

    for i, ins in enumerate(insList):
        ttins = triton.Instruction()
        ttins.setAddress(ins.ins_address)
        ttins.setOpcode(ins.bytes)

        for memAcc in ins.mem_accs:
            if (memAcc.type != Supertrace.AccessType.READ):
                continue
            for i in range(memAcc.acc_size):
                memi = triton.MemoryAccess(memAcc.acc_address + i, triton.CPUSIZE.BYTE)
                if (not tmpctx.isConcreteMemoryValueDefined(memi)):
                    oldby = (memAcc.old_data >> (i * 8)) & 0xFF
                    tmpctx.setConcreteMemoryValue(memi, oldby)
                    tmpctx.symbolizeMemory(memi, hex(memAcc.acc_address + i))

        tmpctx.processing(ttins)

        if ((i == len(insList) - 1) and checkIndirectIns(ttins)):
            if (hasattr(ins, "obfIndirectStatus") and ins.obfIndirectStatus == True): # 已经证明过了，跳过此次证明过程
                continue
            ins.obfIndirectStatus = False
            ins.obfIndirectAddr = 0
            if (not ttins.isSymbolized()):
                ins.obfIndirectStatus = True
                ins.obfIndirectAddr = record[ins.id + 1].ins_address
                # print(f"{hex(ins.ins_address)} 是间接跳转混淆! 地址: {hex(ins.obfIndirectAddr)} {ttins}")

def prove_obfuse_indirect(bbs: set[ST_BasicBlock]) -> tuple[set[ST_BasicBlock], bool]:
    hasChange = False
    bbs = set(bbs)

    for bb in bbs:
        lastIns = bb.getLastIns()
        if (hasattr(lastIns, "obfIndirectStatus") and lastIns.obfIndirectStatus == True):
            continue # 已证明过，跳过
        checkObfIndirect(bb)
        if (hasattr(lastIns, "obfIndirectStatus") and lastIns.obfIndirectStatus == True):
            bb.obfIndirectStatus = lastIns.obfIndirectStatus
            bb.obfIndirectAddr = lastIns.obfIndirectAddr
            hasChange = True

    return bbs, hasChange

内存操作数常量隐藏

In [16]:
def checkMemOpConstHide(bb: ST_BasicBlock) -> bool:
    hasChange = False
    insList = bb.instructions

    tmpctx = triton.TritonContext()
    tmpctx.setArchitecture(ctx.getArchitecture())
    tmpctx.setMode(triton.MODE.ALIGNED_MEMORY, True)
    tmpctx.setMode(triton.MODE.AST_OPTIMIZATIONS, True)
    tmpctx.setMode(triton.MODE.CONSTANT_FOLDING, True)
    tmpctx.setMode(triton.MODE.SYMBOLIZE_INDEX_ROTATION, True)

    initTritonCtxEnvSym(tmpctx, bb.getFirstIns(), main_thread.teb) # 初始化具体化环境
    tmpctx.symbolizeRegister(tmpctx.registers.rsp, tmpctx.registers.rsp.getName())

    for i, ins in enumerate(insList):
        ttins = triton.Instruction()
        ttins.setAddress(ins.ins_address)
        ttins.setOpcode(ins.bytes)

        tmpctx.disassembly(ttins)

        ops = ttins.getOperands()
        for op in ops:
            if (op.getType() != triton.OPERAND.MEM):
                continue
            memop: triton.MemoryAccess = op
            baseReg = memop.getBaseRegister()
            indexReg = memop.getIndexRegister()
            ins.obfMemConstHideStatus = False
            ins.obfMemConstHideDetail = set() # (TTRegID, TTRegName, RealValue)
            if (tmpctx.isRegisterValid(baseReg) and tmpctx.isRegisterValid(indexReg) and baseReg == indexReg and (not tmpctx.isRegisterSymbolized(baseReg))): # 如果是相同寄存器
                if (tmpctx.isRegisterValid(baseReg) and not tmpctx.isRegisterSymbolized(baseReg)):
                    hasChange = True
                    ins.obfMemConstHideStatus = True
                    real = tmpctx.getConcreteRegisterValue(baseReg)
                    ins.obfMemConstHideDetail.add( (baseReg.getId(), baseReg.getName(), real))
                    # print(f"{hex(ins.dbg_id)} {ins.ttins}\t\t的内存操作数的 {baseReg.getName()} 寄存器是常量 {hex(real)}")
            else:
                if (tmpctx.isRegisterValid(baseReg) and not tmpctx.isRegisterSymbolized(baseReg)):
                    hasChange = True
                    ins.obfMemConstHideStatus = True
                    real = tmpctx.getConcreteRegisterValue(baseReg)
                    ins.obfMemConstHideDetail.add( (baseReg.getId(), baseReg.getName(), real))
                    # print(f"{hex(ins.dbg_id)} {ins.ttins}\t\t的内存操作数的 {baseReg.getName()} 寄存器是常量 {hex(real)}")
                if (tmpctx.isRegisterValid(indexReg) and not tmpctx.isRegisterSymbolized(indexReg)):
                    hasChange = True
                    ins.obfMemConstHideStatus = True
                    real = tmpctx.getConcreteRegisterValue(indexReg)
                    ins.obfMemConstHideDetail.add( (indexReg.getId(), indexReg.getName(), real))
                    # print(f"{hex(ins.dbg_id)} {ins.ttins}\t\t的内存操作数的 {indexReg.getName()} 寄存器是常量 {hex(real)}")

        for memAcc in ins.mem_accs:
            if (memAcc.type != Supertrace.AccessType.READ):
                continue
            for i in range(memAcc.acc_size):
                memi = triton.MemoryAccess(memAcc.acc_address + i, triton.CPUSIZE.BYTE)
                if (not tmpctx.isConcreteMemoryValueDefined(memi)):
                    oldby = (memAcc.old_data >> (i * 8)) & 0xFF
                    tmpctx.setConcreteMemoryValue(memi, oldby)
                    tmpctx.symbolizeMemory(memi, hex(memAcc.acc_address + i))

        tmpctx.buildSemantics(ttins)

    return hasChange

def prove_obfuse_memopConstHide(bbs: set[ST_BasicBlock]) -> tuple[set[ST_BasicBlock], bool]:
    hasChange = False
    bbs = set(bbs)

    for bb in bbs:
        if (checkMemOpConstHide(bb)):
            hasChange = True

    return bbs, hasChange

In [17]:
# 进行预处理
print(f"预处理前基本块数量: {len(bbs)}")
# bbs, _ = split_basic_blocks(bbs)
print(f"预处理后基本块数量: {len(bbs)}")

预处理前基本块数量: 3577
预处理后基本块数量: 3577


In [18]:
def drawBBLinkGraph(bbs: set[ST_BasicBlock], createUnkNodeWhenBBInvalid: bool = False):
    '''createUnkNodeWhenBBInvalid: 检查目标节点是否存在，若不存在则根据此参数创建对应的UNK节点再绘制edge，否则取消edge绘制。'''
    g = graphviz.Digraph('BasicBlockLinkGraph', graph_attr={'rankdir': 'LR'})

    valids = set() # 用于快速检查某一个基本块是否存在
    for bb in bbs:
        node = bb.getCurrentAddr()
        valids.add(node)
        g.node(hex(node), label=f'{hex(node)}: {len(bb.instructions)}', color="")

    precursors = defaultdict(set[BBLink]) # 前驱
    successors = defaultdict(set[BBLink]) # 后继

    for bb in bbs:
        current = bb.getCurrentAddr()
        if (bb.isControlflow):
            if (bb.isDirect):

                if (bb.isConditionalJump): # 针对有条件跳转
                    if not (hasattr(bb, "obfOPStatus") and bb.obfOPStatus == True): # ObfOP 反混淆
                        taken1 = BBLink(BBLinkType.DIRECT_COND_TAKEN, bb.targetAddr)
                        taken2 = BBLink(BBLinkType.DIRECT_COND_NOTTAKEN, bb.fallThroughAddr)
                        target = set({taken1, taken2})
                    else:
                        if (bb.obfOPCondTaken):
                            taken = BBLink(BBLinkType.OBF_JUMP, bb.targetAddr)
                        else:
                            taken = BBLink(BBLinkType.OBF_JUMP, bb.fallThroughAddr)
                        target = set({taken})
                else: # 针对无条件跳转 到这里的基本块的跳转指令是 jmp imm 或 call imm
                    taken = BBLink(BBLinkType.DIRECT_UNCOND, bb.targetAddr)
                    target = set({taken})

            else: # 间接跳转
                if not (hasattr(bb, "obfIndirectStatus") and bb.obfIndirectStatus == True): # ObfIndirect 反混淆
                    target = set()
                    for indirectTargetAddr in bb.indirectTargetAddrs:
                        taken = BBLink(BBLinkType.INDIRECT_UNCOND, indirectTargetAddr)
                        target.add(taken)
                else:
                    taken = BBLink(BBLinkType.OBF_JUMP, bb.obfIndirectAddr)
                    target = set({taken})
        else:
            # 到这里的基本块是没有控制流指令的
            taken = BBLink(BBLinkType.DIRECT_FALLTHROUGH, bb.fallThroughAddr)
            target = set({taken})
        
        successors[current] = successors[current].union(target)
        for taken in target:
            precursors[taken.addr].add(BBLink(taken.type, current))

    for succSrc in successors:
        succDests = successors[succSrc]
        for succDestTaken in succDests:
            if (succDestTaken.type == BBLinkType.INDIRECT_UNCOND):
                color, style = "blue", "dashed"
            elif (succDestTaken.type == BBLinkType.DIRECT_UNCOND):
                color, style = "blue", "solid"
            elif (succDestTaken.type == BBLinkType.DIRECT_COND_TAKEN):
                color, style = "green", "solid"
            elif (succDestTaken.type == BBLinkType.DIRECT_COND_NOTTAKEN):
                color, style = "red", "solid"
            elif (succDestTaken.type == BBLinkType.DIRECT_FALLTHROUGH):
                color, style = "grey", "solid"
            elif (succDestTaken.type == BBLinkType.OBF_JUMP):
                color, style = "pink", "solid"
            else:
                raise RuntimeError()

            if (succDestTaken.addr in valids):
                g.edge(hex(succSrc), hex(succDestTaken.addr), color=color, style=style)
            else:
                if (createUnkNodeWhenBBInvalid):
                    g.edge(hex(succSrc), hex(succDestTaken.addr), color=color, style=style)
                    g.node(hex(succDestTaken.addr), label=f'UNK_{hex(succDestTaken.addr)}', color="red")

    return g

In [19]:
# drawBBLinkGraph(bbs) # 绘制基本块链接图

In [20]:
# 反混淆迭代循环
keepRun = True
runCount = 0
processors: list = [merge_basic_block, prove_opaque_predicate, prove_obfuse_indirect]
while(keepRun):
    runCount += 1
    keepRun = False

    for processor in processors:
        hasChange = False
        bbs, hasChange = processor(bbs)
        if (hasChange): 
            keepRun = True

print(f"反混淆循环结束! 共周转执行 {runCount} 次")
print(f"当前基本块数量: {len(bbs)}")

反混淆循环结束! 共周转执行 8 次
当前基本块数量: 763


In [21]:
# drawBBLinkGraph(bbs) # 绘制基本块链接图

In [22]:
def writeBytesX64dbg(insaddr: int, bys: bytes) -> str:
    result = ""
    for i, by in enumerate(bys):
        result += f"memset {hex(insaddr + i)},{hex(by)},0x1"
        if (i != len(bys) - 1):
            result += "\n"
    return result

In [23]:
for ins in record:
    if (hasattr(ins, "obfOPStatus") and ins.obfOPStatus == True):
        print(f"{hex(ins.ins_address)} 是不透明谓词!  真实跳转谓词: {ins.obfOPCondTaken} \t{ins.ttins}")

0x14033fb82 是不透明谓词!  真实跳转谓词: False 	0x14033fb82: jb 0x1402d2470
0x140292375 是不透明谓词!  真实跳转谓词: True 	0x140292375: jae 0x1402ec76b
0x1402ac3f2 是不透明谓词!  真实跳转谓词: True 	0x1402ac3f2: jle 0x14046a1e2
0x1401a38e3 是不透明谓词!  真实跳转谓词: False 	0x1401a38e3: je 0x140294c36
0x140215845 是不透明谓词!  真实跳转谓词: True 	0x140215845: jbe 0x14017528b
0x1402fa90c 是不透明谓词!  真实跳转谓词: True 	0x1402fa90c: jb 0x140432b3c
0x140432b44 是不透明谓词!  真实跳转谓词: False 	0x140432b44: jae 0x1403615ae
0x14026b2a0 是不透明谓词!  真实跳转谓词: True 	0x14026b2a0: jb 0x1403b1cf9
0x1403b1d49 是不透明谓词!  真实跳转谓词: True 	0x1403b1d49: jnp 0x14015bd79
0x14015bd7d 是不透明谓词!  真实跳转谓词: True 	0x14015bd7d: jge 0x14017528b
0x14018bc3f 是不透明谓词!  真实跳转谓词: True 	0x14018bc3f: je 0x1404253bf
0x1401f4125 是不透明谓词!  真实跳转谓词: True 	0x1401f4125: jp 0x1403911cc
0x14039120e 是不透明谓词!  真实跳转谓词: True 	0x14039120e: js 0x140316402
0x14031641b 是不透明谓词!  真实跳转谓词: True 	0x14031641b: js 0x1402910b3
0x1402910c4 是不透明谓词!  真实跳转谓词: True 	0x1402910c4: jl 0x14021a1d6
0x1401ad346 是不透明谓词!  真实跳转谓词: True 	0x1401ad346

In [None]:
# 生成不透明谓词的x64dbg还原脚本
for ins in record:
    if (hasattr(ins, "obfOPStatus") and ins.obfOPStatus == True):
        print(f"memset {hex(ins.ins_address)},0x90,{hex(len(ins.bytes))}")
        if (ins.obfOPCondTaken):
            ops = ins.ttins.getOperands()
            immop: triton.Immediate = ops[0]
            writestr = f"jmp {hex(immop.getValue())}"
            bys = asmDecode(writestr, ins.ins_address)
            print(writeBytesX64dbg(ins.ins_address, bys))

In [25]:
for ins in record:
    if (hasattr(ins, "obfIndirectStatus") and ins.obfIndirectStatus == True):
        print(f"{hex(ins.ins_address)} 是间接跳转混淆!  真实跳转地址: {hex(ins.obfIndirectAddr)} \t{ins.ttins}")

0x140401020 是间接跳转混淆!  真实跳转地址: 0x14026b27b 	0x140401020: jmp r8
0x140223ef7 是间接跳转混淆!  真实跳转地址: 0x1402a3eb0 	0x140223ef7: jmp r11
0x14019995b 是间接跳转混淆!  真实跳转地址: 0x14031c143 	0x14019995b: jmp r8
0x1401f5c4d 是间接跳转混淆!  真实跳转地址: 0x140429e53 	0x1401f5c4d: jmp rdx
0x14028baba 是间接跳转混淆!  真实跳转地址: 0x1401b84d1 	0x14028baba: jmp r11
0x140307391 是间接跳转混淆!  真实跳转地址: 0x14015f51f 	0x140307391: jmp rbx
0x140199be7 是间接跳转混淆!  真实跳转地址: 0x140141f4b 	0x140199be7: jmp r8
0x140269424 是间接跳转混淆!  真实跳转地址: 0x14047aed0 	0x140269424: jmp rax
0x14014393c 是间接跳转混淆!  真实跳转地址: 0x1404a857f 	0x14014393c: jmp r11
0x14017b159 是间接跳转混淆!  真实跳转地址: 0x140260e2d 	0x14017b159: jmp rsi
0x1402ff6ce 是间接跳转混淆!  真实跳转地址: 0x1403e8773 	0x1402ff6ce: jmp rbx
0x1401ffc1d 是间接跳转混淆!  真实跳转地址: 0x140382ef7 	0x1401ffc1d: jmp rcx
0x14018df7d 是间接跳转混淆!  真实跳转地址: 0x140377910 	0x14018df7d: jmp rcx
0x1402deff7 是间接跳转混淆!  真实跳转地址: 0x1401f8fd3 	0x1402deff7: jmp r11
0x1401664b7 是间接跳转混淆!  真实跳转地址: 0x14041b2c6 	0x1401664b7: jmp rdx
0x14020e3c8 是间接跳转混淆!  真实跳转地址: 0x140295b92 	

处理 obfMemConstHide

In [32]:
for bb in bbs:
    for ins in bb.instructions:
        if (hasattr(ins, "obfMemConstHideStatus")):
            del ins.obfMemConstHideStatus
        if (hasattr(ins, "obfMemConstHideDetail")):
            del ins.obfMemConstHideDetail
        if (hasattr(ins, "obfMemConstHideRep")):
            del ins.obfMemConstHideRep

In [33]:
bbs, _ = prove_obfuse_memopConstHide(bbs)

In [None]:
# 用于解决与lea指令有关的指令反混淆时所产生的disp32问题
def eval_const_expr(expr, size):
    mask = (1 << size) - 1

    if isinstance(expr, ExprInt):
        return ExprInt(expr.arg & mask, size)

    if isinstance(expr, ExprId):
        return None

    if isinstance(expr, ExprOp):
        op = expr.op
        args = expr.args

        eval_args = []
        for a in args:
            r = eval_const_expr(a, size)
            if r is None:
                return None
            eval_args.append(r.arg)

        try:
            if op == '+':
                val = sum(eval_args)
            elif op == '-':
                val = eval_args[0]
                for x in eval_args[1:]:
                    val -= x
            elif op == '*':
                val = 1
                for x in eval_args:
                    val *= x
            elif op == '<<':
                val = eval_args[0] << eval_args[1]
            elif op == '>>':
                val = eval_args[0] >> eval_args[1]
            elif op == '&':
                val = eval_args[0]
                for x in eval_args[1:]:
                    val &= x
            elif op == '|':
                val = eval_args[0]
                for x in eval_args[1:]:
                    val |= x
            elif op == '^':
                val = eval_args[0]
                for x in eval_args[1:]:
                    val ^= x
            else:
                return None

        except Exception:
            return None

        return ExprInt(val & mask, size)

    return None

def try_fix_lea(ins):
    if ins.name != 'LEA':
        return ins

    dst, src = ins.args

    assert(isinstance(src, ExprMem))
    addrExpr = src.ptr

    const_addr = eval_const_expr(addrExpr, 64)
    if const_addr is None:
        return ins

    # LEA reg, [const] -> MOV reg, const
    new_ins = mn_x86.fromstring(
        f"MOV {dst}, 0x{const_addr.arg:x}",
        None,
        64
    )
    return new_ins


In [35]:
for bb in bbs:
    for ins in bb.instructions:
        if (hasattr(ins, "obfMemConstHideStatus") and ins.obfMemConstHideStatus == True):
            mmins = mn_x86.dis(ins.bytes, 64)
            rep = {}
            for detail in ins.obfMemConstHideDetail:
                bit = ctx.getRegister(detail[0]).getBitSize()
                rep[ExprId(detail[1].upper(), bit)] = ExprInt(detail[2], bit)
            for i, arg in enumerate(mmins.args):
                if (arg.is_mem()):
                    mmins.args[i] = arg.replace_expr(rep)
            try:
                asm = mn_x86.asm(try_fix_lea(mmins))
                ins.obfMemConstHideRep = asm
            except ValueError:
                print(f"{ins} 错误!\t{mmins}")
                continue

<DbgId: 1424d> ip: 140483709 insSize: 7 错误!	LEA        ECX, DWORD PTR [0xBCBA7440 + RCX + 0xFFFFFFFFE980F160]
<DbgId: 15f63> ip: 1402d57d9 insSize: 8 错误!	LEA        R8D, DWORD PTR [R8 + 0xFFFFFFBB + 0xFFFFFFFFEE17C0F3]
<DbgId: 160e5> ip: 1404b3907 insSize: 7 错误!	LEA        EDX, DWORD PTR [0xFFFFFFA4 + RDX + 0xFFFFFFFFAA304A15]
<DbgId: 42f0> ip: 14044c096 insSize: 7 错误!	LEA        EBP, DWORD PTR [0x2AA11BBE + RBP + 0x5A0B917A]
<DbgId: cef2> ip: 14041923f insSize: 8 错误!	LEA        ECX, DWORD PTR [0x168C353D + RCX + 0x7BA8B859]
<DbgId: 3dd1> ip: 14031efb2 insSize: 7 错误!	LEA        EBP, DWORD PTR [RBP + 0x4282A5A4 * 0x2 + 0x1128A245]
<DbgId: 625a> ip: 1401e7287 insSize: 8 错误!	LEA        R9D, DWORD PTR [R9 + 0xFFFFFF86 + 0xFFFFFFFFE2B9D27D]
<DbgId: e721> ip: 140229a65 insSize: 8 错误!	LEA        ECX, DWORD PTR [0xFFFFFFFF6EC1BAC4 + RCX + 0xFFFFFFFFFD396EFD]
<DbgId: 1920e> ip: 14046f933 insSize: 7 错误!	LEA        EDX, DWORD PTR [RDX + 0x12238C1A * 0x4 + 0x662A9E18]
<DbgId: 127d2> ip: 1404503e6 

In [None]:
# 生成内存操作数常量隐藏的x64dbg还原脚本
for bb in bbs:
    for ins in bb.instructions:
        if (hasattr(ins, "obfMemConstHideStatus") and ins.obfMemConstHideStatus == True and hasattr(ins, "obfMemConstHideRep")):
            if (not len(ins.obfMemConstHideRep) > 0):
                # print("数量异常")
                continue
            rep: bytes = ins.obfMemConstHideRep[0] # 第一个默认是最短的
            if (len(rep) > len(ins.bytes)):
                # print("长度不够")
                continue
            print(f"memset {hex(ins.ins_address)},0x90,{hex(len(ins.bytes))}")
            print(writeBytesX64dbg(ins.ins_address, rep))

In [37]:
# 不是混淆的jcc跳转
for bb in bbs:
    for ins in bb.instructions:
        if (ins.ttins.isBranch() and ins.ttins.getType() != triton.OPCODE.X86.JMP):
            if (hasattr(ins, "obfOPStatus") and ins.obfOPStatus == False):
                print(f"{ins}\t{ins.ttins}")

<DbgId: 16b0a> ip: 140434cf7 insSize: 6	0x140434cf7: ja 0x140348b72
<DbgId: 3682> ip: 14016b1e3 insSize: 6	0x14016b1e3: ja 0x140301f12
<DbgId: 3ae> ip: 14015818d insSize: 6	0x14015818d: ja 0x14035a855
<DbgId: 372d> ip: 140174c13 insSize: 6	0x140174c13: ja 0x1403815c9
<DbgId: 7beb> ip: 140145432 insSize: 6	0x140145432: ja 0x140160fca
<DbgId: 36ef> ip: 1404889df insSize: 6	0x1404889df: jae 0x1404c32d1
<DbgId: 36fe> ip: 140249977 insSize: 6	0x140249977: jne 0x14017dd0e
<DbgId: 11451> ip: 14038c2f9 insSize: 6	0x14038c2f9: ja 0x14020dca0
<DbgId: 15ddd> ip: 14022bb95 insSize: 6	0x14022bb95: ja 0x1403a87f5
<DbgId: e5fa> ip: 1404120e3 insSize: 6	0x1404120e3: ja 0x1401817ef
<DbgId: 7884> ip: 1402afc4d insSize: 6	0x1402afc4d: jb 0x14049ec26
<DbgId: 3703> ip: 14017dd21 insSize: 6	0x14017dd21: jb 0x14049057d
<DbgId: 137a6> ip: 14033e6ef insSize: 6	0x14033e6ef: ja 0x14049c1d2
<DbgId: 6a84> ip: 140240360 insSize: 6	0x140240360: ja 0x1401b2017


In [38]:
# 不是混淆的间接跳转
for bb in bbs:
    for ins in bb.instructions:
        if (checkIndirectIns(ins.ttins)):
            if (hasattr(ins, "obfIndirectStatus") and ins.obfIndirectStatus == False):
                print(f"{ins}\t{ins.ttins}")

<DbgId: 12531> ip: 1401a7c1e insSize: 2	0x1401a7c1e: jmp rsi
<DbgId: 10ce5> ip: 1402ca13f insSize: 2	0x1402ca13f: jmp rbp
<DbgId: 430a> ip: 1402c6cd2 insSize: 3	0x1402c6cd2: ret 8
<DbgId: 16b0f> ip: 140344e83 insSize: 3	0x140344e83: jmp r10
<DbgId: 17fe2> ip: 140303df8 insSize: 2	0x140303df8: jmp rdi
<DbgId: 149a7> ip: 14026cfcd insSize: 2	0x14026cfcd: jmp rdi
<DbgId: 8c14> ip: 1402cfab6 insSize: 2	0x1402cfab6: jmp rdi
<DbgId: 184ed> ip: 1402b6c56 insSize: 2	0x1402b6c56: jmp rdi
<DbgId: 7ee8> ip: 1401ba0c9 insSize: 2	0x1401ba0c9: jmp rdi
<DbgId: 18567> ip: 1402ab2a0 insSize: 2	0x1402ab2a0: jmp rdx
<DbgId: 9f81> ip: 140226fdf insSize: 2	0x140226fdf: jmp rdi
<DbgId: 7fb6> ip: 140123e62 insSize: 2	0x140123e62: jmp rdx
<DbgId: 6a85> ip: 1401b2017 insSize: 3	0x1401b2017: jmp r10
<DbgId: d7a> ip: 1403e6536 insSize: 2	0x1403e6536: jmp rdx
<DbgId: b83d> ip: 1402f1218 insSize: 2	0x1402f1218: jmp rdi
<DbgId: 19548> ip: 1402a8b29 insSize: 2	0x1402a8b29: jmp rdi
<DbgId: 1697e> ip: 1404b0f02 insSiz