-
Notifications
You must be signed in to change notification settings - Fork 0
/
my_debugger.py
168 lines (136 loc) · 6.31 KB
/
my_debugger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
from ctypes import *
from my_debugger_defines import *
kernel32 = windll.kernel32
class debugger():
def __init__(self):
self.h_process = None
self.pid = None
self.debugger_active = False
self.h_thread = None
self.context = None
self.exception_address = None
self.software_breakpoints = {}
def load(self, path_to_exe):
creation_flags = DEBUG_PROCESS # CREATE_NEW_CONSOLEでGUI
startupinfo = STARTUPINFO()
process_information = PROCESS_INFORMATION()
# 起動プロセスを別ウィンドウで表示
startupinfo.dwFlags = 0x1
startupinfo.wShowWindow = 0x0
startupinfo.cb = sizeof(startupinfo)
if kernel32.CreateProcessA(path_to_exe.encode('utf-8'), None, None, None, None, \
creation_flags, None, None, byref(startupinfo), byref(process_information)):
print("[*] We have successfully launched the process!")
print("[*] PID: {}".format(process_information.dwProcessId))
self.h_process = self.open_process(process_information.dwProcessId)
else:
print("[*] Error: 0x{:08x}.".format(kernel32.GetLastError()))
def open_process(self, pid):
h_process = kernel32.OpenProcess(PROCESS_ALL_ACCESS, False, pid)
return h_process
def attach(self, pid):
self.h_process = self.open_process(pid)
if kernel32.DebugActiveProcess(pid):
self.debugger_active = True
self.pid = int(pid)
else:
print("[*] Unable to attach to the process.")
def run(self):
while self.debugger_active == True:
self.get_debug_event()
def get_debug_event(self):
debug_event = DEBUG_EVENT()
continue_status = DBG_CONTINUE
if kernel32.WaitForDebugEvent(byref(debug_event), INFINITE):
self.h_thread = self.open_thread(debug_event.dwThreadId)
self.context = self.get_thread_context(h_thread=self.h_thread)
# input("Process a key to continue...")
# self.debugger_active = False
print("Event Code: {:d} Thread ID: {:d}".format(debug_event.dwDebugEventCode, debug_event.dwThreadId))
if debug_event.dwDebugEventCode == EXCEPTION_DEBUG_EVENT:
exception = debug_event.u.Exception.ExceptionRecord.ExceptionCode
self.exception_address = debug_event.u.Exception.ExceptionRecord.ExceptionAddress
if exception == EXCEPTION_ACCESS_VIOLATION:
print("Access Violation Detected.")
elif exception == EXCEPTION_BREAKPOINT:
continue_status = self.exception_handler_breakpoint()
elif exception == EXCEPTION_GUARD_PAGE:
print("Guard Page Access Detected.")
elif exception == EXCEPTION_SINGLE_STEP:
print("Single Stepping.")
kernel32.ContinueDebugEvent(debug_event.dwProcessId, debug_event.dwThreadId, continue_status)
def detach(self):
if kernel32.DebugActiveProcessStop(self.pid):
print("[*] Finished debugging. Exiting...")
else:
print("There was an error")
return False
def open_thread(self, thread_id):
h_thread = kernel32.OpenThread(THREAD_ALL_ACCESS, None, thread_id)
if h_thread is not 0:
return h_thread
else:
print("[*] Could not obtain a valid thread handle.")
return False
def enumerate_threads(self):
thread_entry = THREADENTRY32()
thread_list = []
snapshot = kernel32.CreateToolhelp32Snapshot(TH32CS_SNAPTHREAD, self.pid)
if snapshot is not None:
thread_entry.dwSize = sizeof(thread_entry)
success = kernel32.Thread32First(snapshot, byref(thread_entry))
while success:
if thread_entry.th32OwnerProcessID == self.pid:
thread_list.append(thread_entry.th32ThreadID)
success = kernel32.Thread32Next(snapshot, byref(thread_entry))
kernel32.CloseHandle(snapshot)
return thread_list
else:
return False
def get_thread_context(self, thread_id=None, h_thread=None):
context = CONTEXT()
context.ContextFlags = CONTEXT_FULL | CONTEXT_DEBUG_REGISTERS
if h_thread is None:
h_thread = self.open_thread(thread_id)
if kernel32.GetThreadContext(h_thread, byref(context)):
kernel32.CloseHandle(h_thread)
return context
else:
return False
def exception_handler_breakpoint(self):
print("[*] Inside the breakpoint handler")
print("Exception Address: 0x{:08x}".format(self.exception_address))
return DBG_CONTINUE
def read_process_memory(self, address, length):
data = ""
read_buf = create_string_buffer(length)
count = c_ulong(0)
if not kernel32.ReadProcessMemory(self.h_process, address, read_buf, length, byref(count)):
return False
else:
data += read_buf.raw
return data
def write_process_memory(self, address, data):
count = c_ulong(0)
length = len(data)
c_data = c_char_p(data[count.value:])
if not kernel32.WriteProcessMemory(self.h_process, address, c_data, length, byref(count)):
return False
else:
return True
def bp_set_sw(self, address):
print("[*] Setting breakpoint at: 0x{:08x}".format(address))
print(type(self.software_breakpoints))
if not address in self.software_breakpoints.keys():
try:
original_byte = self.read_process_memory(address, 1)
self.write_process_memory(address, "\xCC")
self.software_breakpoints[address] = (original_byte)
except:
return False
return True
def func_resolve(self, dll, function):
handle = kernel32.GetModuleHandleA(dll.encode())
address = kernel32.GetProcAddress(handle, function.encode())
kernel32.CloseHandle(handle)
return address