In [6]:
import os 
import json
from pathlib import Path
from utils.key import *
from utils.ghidra_helper import *
from utils.launcher import HeadlessLoggingPyhidraLauncher
from utils.ghidra_helper import *
from utils.db import *
import os 
from functools import reduce
from operator import mul

In [2]:
# start the Launcher 
launcher = HeadlessLoggingPyhidraLauncher(verbose=True, log_path='./launch.log')
launcher.start()

INFO  Using log config file: jar:file:/home/dingisoul/dev/ghidra_11.0.1_PUBLIC/Ghidra/Framework/Generic/lib/Generic.jar!/generic.log4j.xml (LoggingInitialization)  
INFO  Using log file: ./launch.log (LoggingInitialization)  
INFO  Loading user preferences: /home/dingisoul/.ghidra/.ghidra_11.0.1_PUBLIC/preferences (Preferences)  
INFO  Searching for classes... (ClassSearcher)  
INFO  Class search complete (593 ms) (ClassSearcher)  
INFO  Initializing SSL Context (SSLContextInitializer)  
INFO  Initializing Random Number Generator... (SecureRandomFactory)  
INFO  Random Number Generator initialization complete: NativePRNGNonBlocking (SecureRandomFactory)  
INFO  Trust manager disabled, cacerts have not been set (ApplicationTrustManagerFactory)  


In [3]:
# Reuse the project create or open in chapter 1 
# Necessary imports for ghidra project 
from ghidra.base.project import GhidraProject
from java.io import IOException
from pathlib import Path 

# Create Project Dir and name 
project_location = Path('./ghidra_project')
project_location.mkdir(exist_ok=True, parents=True)
project_name = "espmatch_project"

# create or open project 
try:
    project = GhidraProject.openProject(project_location, project_name, True)
    print(f'Opened project: {project.project.name}')
except IOException:
    project = GhidraProject.createProject(project_location, project_name, False)
    print(f'Created project: {project.project.name}')

INFO  Opening project: /home/dingisoul/dev/FirmFlaw/ghidra_project/espmatch_project (DefaultProject)  
Opened project: espmatch_project


In [4]:
duplicate_program_name = {}
def handle_duplicate(name: str) -> str:
    no = duplicate_program_name.get(name, 0)
    duplicate_program_name[name] = no + 1
    return f"{name}{no}"

In [7]:
# import the program and analyze all 
json_end = '_firminfo.json'
lang =  get_language("Xtensa:LE:32:default")

csv_name = './xtensa_func_database.csv'
if os.path.exists(csv_name):
    os.remove(csv_name)
csv_file = open(csv_name, 'w')
program = None 

DATABASE = './step2_postSig/esp/bin_func.db'
if os.path.exists(DATABASE):
    print(f'remove {DATABASE}')
    os.remove(DATABASE)
conn = sqlite3.connect(DATABASE)
cursor = conn.cursor()
# if no database create the table and index 
sql_create_table(cursor,FUNC_KEYS,FUNC_TABLE_NAME)
sql_create_index(cursor,FUNC_TABLE_NAME,['hash'],'index_hash')
conn.commit()

num = 0
for root, dirs, files in os.walk('./step2_postSig/esp/'):
    bin_files = [f for f in files if not f.endswith(json_end) and not f.endswith('noheader')]
    dir_ = Path(root)
    for bin_ in bin_files: 
        if bin_.endswith('db'):
            continue  
        # with open(dir_ / (bin_ + json_end), 'r') as file:
        #     data = json.load(file)
        #     base_address = data['base address']
        #     if len(base_address) == 0:
        #         continue 
        print(f"\033[31mIter file {dir_ / bin_} at {num}\033[0m")
        program = project.importProgram(dir_ / bin_, lang , get_compiler_spec(lang))
        # get the flat api 
        from ghidra.program.flatapi import FlatProgramAPI
        flat_api = FlatProgramAPI(program)
        old_base = program.getImageBase()
        # image_base = int(base_address, base=16)
        # 1. setImageBase (Address base, boolean commit)
        # program.setImageBase(old_base.getNewAddress(image_base), True)
        # create_handlers(program, flat_api)
        flat_api.analyzeAll(program)

        # insert func
        rows_ = []
        for func_ in program.getListing().getFunctions(True):
            if filter_func(func_):
                row_ = (func_.getName(),program.getName())
                inst_ = get_inst_key(func_)
                graph = get_struct_graph_key(func_)
                # make sure the inst_[0] means the numAddress 
                hash_ = reduce(mul,(n for n in graph),1) * inst_[0]
                #if hash_ >= 0xffffffff:
                #    print(f'WARNING: {func_.getName()} hash is a little long {hash_}')
                row_ += (hash_,) + inst_ + graph
                # no check because every bin is different 
                # if not sql_check_duplicate_func(cursor, row_[func_key_idx('name')], row_[func_key_idx('hash')], FUNC_TABLE_NAME):
                rows_.append(row_)
            # insert the rows    
        sql_insert(cursor, FUNC_KEYS.keys(), rows_, FUNC_TABLE_NAME)
        conn.commit()      
        # remember closing the program to avoid memory usage 
        print(f"{program.getName()} insert {len(rows_)} functions at {num}")
        csv_file.write(f'{bin_},{program.getFunctionManager().getFunctionCount()}\n')
        print(f"\033[31mAdd {program.getFunctionManager().getFunctionCount()} functions\033[0m")
        num += 1
        project.saveAs(program, "/", handle_duplicate(program.getName()), True)
        project.close(program)
conn.close()
csv_file.close()
project.close()

WARN  Output not expected by specification restoreRegWindow in
/home/dingisoul/dev/ghidra_11.0.1_PUBLIC/Ghidra/Processors/Xtensa/data/languages/xtensa.cspec (SymbolicPropogator)  
WARN  Output not expected by specification swap8 in
/home/dingisoul/dev/ghidra_11.0.1_PUBLIC/Ghidra/Processors/Xtensa/data/languages/xtensa.cspec (SymbolicPropogator)  
WARN  Output not expected by specification restore8 in
/home/dingisoul/dev/ghidra_11.0.1_PUBLIC/Ghidra/Processors/Xtensa/data/languages/xtensa.cspec (SymbolicPropogator)  
WARN  Output not expected by specification restoreRegWindow in
/home/dingisoul/dev/ghidra_11.0.1_PUBLIC/Ghidra/Processors/Xtensa/data/languages/xtensa.cspec (SymbolicPropogator)  
WARN  Output not expected by specification rotateRegWindow in
/home/dingisoul/dev/ghidra_11.0.1_PUBLIC/Ghidra/Processors/Xtensa/data/languages/xtensa.cspec (SymbolicPropogator)  
WARN  Output not expected by specification swap8 in
/home/dingisoul/dev/ghidra_11.0.1_PUBLIC/Ghidra/Processors/Xtensa/da

IOPub data rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_data_rate_limit`.

Current values:
ServerApp.iopub_data_rate_limit=1000000.0 (bytes/sec)
ServerApp.rate_limit_window=3.0 (secs)



IT-BLT-ATTICFAN_V2.1.bin.fs1285552fs. insert 4607 functions at 44
[31mAdd 5515 functions[0m


In [8]:
print('END')

END


In [10]:
from utils.match import *
from tqdm import tqdm,trange

BIN_FUNC_DB = './step2_postSig/esp/bin_func.db'
conn = sqlite3.connect(BIN_FUNC_DB)
cursor = conn.cursor()

cursor.execute('SELECT COUNT(*) FROM func_table')
result_len = cursor.fetchone()[0]

BASE_DB = './match_base/xtensa/xtensa_func.db'
conn1 = sqlite3.connect(BASE_DB)
cursor1 = conn1.cursor()

cursor.execute('SELECT * FROM func_table')
all_results = {}
max_results = {}
for i in trange(result_len):
    result_ = cursor.fetchone()
    if result_ is None:
        tqdm.write(f'wrong')
        break 
    # del the id 
    result_ = result_[1:]
    name_ = result_[func_key_idx('name')]
    program_ = result_[func_key_idx('program')]
    (max_, matches_) = compare_func_db(cursor1, result_)
    if len(matches_) > 0:
        if all_results.get(program_) is None:
            all_results[program_] = {}
            max_results[program_] = {}
        match_result_ = []
        for match_ in matches_:
            # only add func name and program name and numAddresses and ratio to result_
            match_result_.append([match_[0][0],match_[0][1],match_[0][3],match_[1]])
        all_results[program_][name_] = match_result_
        # max results 
        max_results[program_][name_] = {'name':max_[0][0],
                                    'program': max_[0][1],
                                    'numAddr': max_[0][3],
                                    'ratio': max_[1]}
conn.close()
conn1.close()

100%|█████████████████████████████████████████████████████████████████████████| 148703/148703 [14:46<00:00, 167.83it/s]


In [11]:
import json
with open('./esp_func_match.json', 'w') as file:
    json.dump(all_results, file, indent=4)
with open('./esp_func_max_match.json', 'w') as file:
    json.dump(max_results, file, indent=4)
with open('esp_match_program.csv', 'w') as file:
    for (k,v) in max_results.items():
        file.write(f'{k},{len(v)}\n')

In [15]:
BIN_FUNC_DB = './step2_postSig/esp/bin_func.db'
conn = sqlite3.connect(BIN_FUNC_DB)
cursor = conn.cursor()

cursor.execute('SELECT COUNT(*) FROM func_table')
result_len = cursor.fetchone()[0]
print(f'all:{result_len}')

sum = 0
for v in max_results.values():
    sum += len(v)
print(f'sum: {sum}')

all:148703
sum: 14236
