/
os_trace.py
176 lines (141 loc) · 6.03 KB
/
os_trace.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
#!/usr/bin/env python3
import dataclasses
import plistlib
import struct
import tempfile
import typing
from datetime import datetime
from enum import IntEnum
from pathlib import Path
from tarfile import TarFile
from construct import Adapter, Byte, Bytes, Computed, Enum, Int16ul, Int32ul, Optional, RepeatUntil, Struct, this
from pymobiledevice3.exceptions import PyMobileDevice3Exception
from pymobiledevice3.lockdown import LockdownClient
from pymobiledevice3.lockdown_service_provider import LockdownServiceProvider
from pymobiledevice3.services.lockdown_service import LockdownService
from pymobiledevice3.utils import try_decode
CHUNK_SIZE = 4096
TIME_FORMAT = '%H:%M:%S'
SYSLOG_LINE_SPLITTER = '\n\x00'
class SyslogLogLevel(IntEnum):
NOTICE = 0x00
INFO = 0x01
DEBUG = 0x02
# deducted from console-app
USER_ACTION = 0x03
ERROR = 0x10
FAULT = 0x11
@dataclasses.dataclass
class SyslogLabel:
category: str
subsystem: str
@dataclasses.dataclass
class SyslogEntry:
pid: int
timestamp: datetime
level: SyslogLogLevel
image_name: str
filename: str
message: str
label: typing.Optional[SyslogLabel] = None
class TimestampAdapter(Adapter):
def _decode(self, obj, context, path):
return datetime.fromtimestamp(obj.seconds + (obj.microseconds / 1000000))
def _encode(self, obj, context, path):
return list(map(int, obj.split(".")))
timestamp_t = Struct(
'seconds' / Int32ul,
Bytes(4),
'microseconds' / Int32ul
)
syslog_t = Struct(
Bytes(9),
'pid' / Int32ul,
Bytes(42),
'timestamp' / TimestampAdapter(timestamp_t),
Bytes(1),
'level' / Enum(Byte, Notice=0, Info=0x01, Debug=0x02, Error=0x10, Fault=0x11),
Bytes(38),
'image_name_size' / Int16ul,
'message_size' / Int16ul,
Bytes(6),
'_subsystem_size' / Int32ul,
'_category_size' / Int32ul,
Bytes(4),
'_filename' / RepeatUntil(lambda x, lst, ctx: lst[-1] == 0, Byte),
'filename' / Computed(lambda ctx: try_decode(bytearray(ctx._filename[:-1]))),
'_image_name' / Bytes(this.image_name_size),
'image_name' / Computed(lambda ctx: try_decode(ctx._image_name[:-1])),
'_message' / Bytes(this.message_size),
'message' / Computed(lambda ctx: try_decode(ctx._message[:-1])),
'label' / Optional(Struct(
'_subsystem' / Bytes(this._._subsystem_size),
'subsystem' / Computed(lambda ctx: try_decode(ctx._subsystem[:-1])),
'_category' / Bytes(this._._category_size),
'category' / Computed(lambda ctx: try_decode(ctx._category[:-1])),
)),
)
class OsTraceService(LockdownService):
"""
Provides API for the following operations:
* Show process list (process name and pid)
* Stream syslog lines in binary form with optional filtering by pid.
* Get old stored syslog archive in PAX format (can be extracted using `pax -r < filename`).
* Archive contain the contents are the `/var/db/diagnostics` directory
"""
SERVICE_NAME = 'com.apple.os_trace_relay'
RSD_SERVICE_NAME = 'com.apple.os_trace_relay.shim.remote'
def __init__(self, lockdown: LockdownServiceProvider):
if isinstance(lockdown, LockdownClient):
super().__init__(lockdown, self.SERVICE_NAME)
else:
super().__init__(lockdown, self.RSD_SERVICE_NAME)
def get_pid_list(self):
self.service.send_plist({'Request': 'PidList'})
# ignore first received unknown byte
self.service.recvall(1)
response = self.service.recv_prefixed()
return plistlib.loads(response)
def create_archive(self, out: typing.IO, size_limit: int = None, age_limit: int = None, start_time: int = None):
request = {'Request': 'CreateArchive'}
if size_limit is not None:
request.update({'SizeLimit': size_limit})
if age_limit is not None:
request.update({'AgeLimit': age_limit})
if start_time is not None:
request.update({'StartTime': start_time})
self.service.send_plist(request)
assert 1 == self.service.recvall(1)[0]
assert plistlib.loads(self.service.recv_prefixed()).get('Status') == 'RequestSuccessful', 'Invalid status'
while True:
try:
assert 3 == self.service.recvall(1)[0], 'invalid magic'
except ConnectionAbortedError:
break
out.write(self.service.recv_prefixed(endianity='<'))
def collect(self, out: str, size_limit: int = None, age_limit: int = None, start_time: int = None):
"""
Collect the system logs into a .logarchive that can be viewed later with tools such as log or Console.
"""
with tempfile.TemporaryDirectory() as temp_dir:
file = Path(temp_dir) / 'foo.tar'
with open(file, 'wb') as f:
self.create_archive(f, size_limit=size_limit, age_limit=age_limit, start_time=start_time)
TarFile(file).extractall(out)
def syslog(self, pid=-1) -> typing.Generator[SyslogEntry, None, None]:
self.service.send_plist({'Request': 'StartActivity', 'MessageFilter': 65535, 'Pid': pid, 'StreamFlags': 60})
length_length, = struct.unpack('<I', self.service.recvall(4))
length = int(self.service.recvall(length_length)[::-1].hex(), 16)
response = plistlib.loads(self.service.recvall(length))
if response.get('Status') != 'RequestSuccessful':
raise PyMobileDevice3Exception(f'got invalid response: {response}')
while True:
assert b'\x02' == self.service.recvall(1)
length, = struct.unpack('<I', self.service.recvall(4))
line = self.service.recvall(length)
entry = syslog_t.parse(line)
label = None
if entry.label is not None:
label = SyslogLabel(subsystem=entry.label.subsystem, category=entry.label.category)
yield SyslogEntry(pid=entry.pid, timestamp=entry.timestamp, level=SyslogLogLevel(int(entry.level)),
image_name=entry.image_name, filename=entry.filename, message=entry.message, label=label)