/
requests_raw.py
212 lines (167 loc) · 6.32 KB
/
requests_raw.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
#!/usr/bin/env python3
# coding=utf-8
r"""
用于获取最原始的http响应原文,
即直接socket中读出的原始响应头、未解开 gzip/chunk 的响应体
工作在SSL的上层, SSL对其透明
工作原理:
hook 掉 requests 的底层 http.client 中的 socket,
每次读取 socket 中数据的时候, 都会额外复制一份出来
由于http.client本身会 *丢弃* 原始数据, 所以只能自己把它存下来
理论上除了额外的内存占用和少许的性能损耗外不会有副作用, 也不会有兼容性风险
使用方法:
首先 monkey_patch() 打 patch
这个 patch 可以在任何时候打, 不像 gevent 一样必须在一开始打,
会影响所有在 patch 之后的 requests 请求
然后像正常一样使用 requests 发起请求, 得到 response
使用 get_raw(response) 获取响应原文
限制:
python3.4+
@零日 <chenze.zcz@alibaba-inc.com>
>>> # BEGIN doctest
>>> monkey_patch()
>>> import requests
>>> import gzip
>>>
>>> # get raw gzipped body
>>> r = requests.get("http://example.com")
>>> raw = get_raw(r)
>>> assert isinstance(raw, bytearray)
>>> assert raw.startswith(b"HTTP/1.1 200 OK\r\n")
>>> dec = gzip.decompress(get_body(raw))
>>> assert dec == r.content
>>> assert "<title>Example Domain</title>" in dec.decode("utf-8")
>>> print("source_ip:{} dest_ip:{}".format(*get_ip(r))) # doctest: +ELLIPSIS
source_ip:(..., ...) dest_ip:(..., 80)
>>>
>>> # chunked encoding
>>> r2 = requests.get("https://www.baidu.com")
>>> raw2 = get_raw(r2)
>>> raw2 # doctest: +ELLIPSIS
bytearray(b'HTTP/1.1 200 OK\r\n...)
>>> assert raw2.startswith(b"HTTP/1.1 200 OK\r\n")
>>> assert b"Transfer-Encoding: chunked" in raw2
>>> dec2 = gzip.decompress(decode_chunked(raw2))
>>> assert dec2 == r2.content
>>> assert b"www.baidu.com" in dec2
>>>
>>> # this url will be 302 redirected to http://example.com/
>>> r3 = requests.get("https://httpbin.org/redirect-to?url=http%3A%2F%2Fexample.com%2F")
>>> raw3 = get_raw(r3)
>>> # notice! the intermediate 302 raw content would NOT be record
>>> assert raw3.startswith(b"HTTP/1.1 200 OK\r\n")
>>> # if you want to record the intermediate result, please use `allow_redirects=False`
>>> r4 = requests.get("https://httpbin.org/redirect-to?url=http%3A%2F%2Fexample.com%2F", allow_redirects=False)
>>> raw4 = get_raw(r4)
>>> assert raw4.startswith(b"HTTP/1.1 302 FOUND\r\n")
"""
__all__ = ("monkey_patch", "get_raw", "get_body", "decode_chunked", "get_ip")
import logging
import functools
import http.client
import io
logger = logging.getLogger(__name__)
_already_patched = False
class HookedBufferedReader(io.BufferedReader):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.dumped = bytearray()
def flush(self, *args, **kwargs):
result = super().flush(*args, **kwargs)
if result:
self.dumped += result
return result
def readline(self, *args, **kwargs):
result = super().readline(*args, **kwargs)
self.dumped += result
return result
def readinto(self, buffer):
_b = memoryview(bytearray(len(buffer)))
result = super().readinto(_b)
self.dumped += _b[:result].tobytes()
buffer[:result] = _b[:result]
return result
def read(self, *args, **kwargs):
result = super().read(*args, **kwargs)
self.dumped += result
return result
def patch_http_client(raw_func):
@functools.wraps(raw_func)
def new_func(self, *args, **kwargs):
""":type self: http_client.HTTPResponse"""
if isinstance(self.fp, (HookedBufferedReader, io.BytesIO)):
# skip!
return raw_func(self, *args, **kwargs)
self._raw_fp = self.fp # type: io.BufferedReader
# 顺便也记录下IP好了, requests本身也没有记录IP的功能
try:
# 对于HTTP请求, 这样就能正常地获取到ip和端口
self.source_ip = self._raw_fp.raw._sock.getsockname()
self.dest_ip = self._raw_fp.raw._sock.getpeername()
except:
try:
# 对于HTTPS, 需要再往下一层才能拿到ip和端口
# QAQ 真是不科学...居然要深入这么多层才能拿到
self.source_ip = self._raw_fp.raw._sock.socket.getsockname()
self.dest_ip = self._raw_fp.raw._sock.socket.getpeername()
except:
self.source_ip = None
self.dest_ip = None
self.fp = HookedBufferedReader(self._raw_fp.raw)
self.dumped = self.fp.dumped # type: bytearray
return raw_func(self, *args, **kwargs)
return new_func
def monkey_patch():
global _already_patched
if _already_patched or hasattr(http.client.HTTPResponse, "_original_begin"):
return
logger.warning("monkey patching!")
http.client.HTTPResponse._original_begin = http.client.HTTPResponse.begin
http.client.HTTPResponse.begin = patch_http_client(http.client.HTTPResponse.begin)
_already_patched = True
def get_raw(resp):
""":rtype: bytearray"""
try:
return resp.raw._original_response.dumped # type: bytearray
except:
return None
def get_body(data):
"""
:type data: bytearray
:rtype: bytearray
"""
pos = data.find(b"\r\n\r\n")
if pos == -1:
return bytearray()
else:
return data[pos + 4:]
def decode_chunked(data):
"""
from: http://beezari.livejournal.com/190869.html
modified for python3 compatibility
:type data: bytearray
:rtype: bytearray
"""
dec_body = bytearray()
# of the data payload. you can also parse content-length header as well.
if data.startswith(b"HTTP/"):
chunked_body = get_body(data)
else:
chunked_body = data
while chunked_body:
off = int(chunked_body[:chunked_body.find(b"\r\n")], 16)
if not off:
break
chunked_body = chunked_body[chunked_body.find(b"\r\n") + 2:]
dec_body += chunked_body[:off]
chunked_body = chunked_body[off + 2:]
return dec_body
def get_addr(resp):
"""获取请求的source_ip 和 dest_ip"""
# (source_ip, dest_ip)
return resp.raw._original_response.source_ip, resp.raw._original_response.dest_ip
get_ip = get_addr # 历史兼容性的别名
if __name__ == "__main__":
import doctest
doctest.testmod()
print("doctest passed")