-
Notifications
You must be signed in to change notification settings - Fork 2
/
client_ssl.py
193 lines (163 loc) · 4.54 KB
/
client_ssl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
import socket
import ssl
def parsed_url(url):
"""
手写url解析函数,有的函数本身美不起来,只能老老实实写
:param url:
:return: (protocol host port path)
"""
# 检查协议
protocol = 'http'
if url[:7] == 'http://':
u = url.split('://')[1]
elif url[:8] == 'https://':
protocol = 'https'
u = url.split('://')[1]
else:
u = url
# 检查默认 path
i = u.find('/')
if i == -1:
host = u
path = '/'
else:
host = u[:i]
path = u[i:]
# 检查端口
port_dict = {
'http': 80,
'https': 443,
}
# 默认端口
port = port_dict[protocol]
if host.find(':') != -1:
h = host.split(':')
host = h[0]
port = int(h[1])
return protocol, host, port, path
def socket_by_protocol(protocol):
"""
根据协议返回一个 socket 实例
:param protocol:
:return: socket实例
"""
if protocol == 'http':
s = socket.socket()
else:
# HTTPS 协议需要使用 ssl.wrap_socket 包装一下原始的 socket
# 除此之外无其他差别
s = ssl.wrap_socket(socket.socket())
return s
def response_by_socket(s):
"""
:param s: socket实例
:return: 这个socket实例读取的所有数据(bytes)
"""
response = b''
buffer_size = 1024
while True:
r = s.recv(buffer_size)
if len(r) == 0:
break
response += r
return response
def parsed_response(r):
"""
把 response 解析出 状态码 headers body
:param r: response 是 str
:return: 状态码 是 int; headers 是 dict; body 是 str
"""
header, body = r.split('\r\n\r\n', 1)
h = header.split('\r\n')
status_code = h[0].split()[1]
status_code = int(status_code)
headers = {}
for line in h[1:]:
k, v = line.split(': ')
headers[k] = v
return status_code, headers, body
def get(url):
"""
用 GET 请求 url 并返回响应,对301进行了处理
:param url:
:return:status_code, headers, body
"""
protocol, host, port, path = parsed_url(url)
s = socket_by_protocol(protocol)
s.connect((host, port))
request = 'GET {} HTTP/1.1\r\nhost: {}\r\nConnection: close\r\n\r\n'.format(path, host)
encoding = 'utf-8'
s.send(request.encode(encoding))
response = response_by_socket(s)
r = response.decode(encoding)
status_code, headers, body = parsed_response(r)
if status_code == 301:
url = headers['Location']
return get(url)
else:
return status_code, headers, body
def main():
"""
:return:status_code, headers, body
"""
url = 'https://www.baidu.com/'
status_code, headers, body = get(url)
print('status_code: ', status_code,
'\r\nheaders: ', headers,
'\r\nbody: ', body)
def test_parsed_url():
"""
parsed_url 函数很容易出错, 所以我们写测试函数来运行看检测是否正常
:return:
"""
http = 'http'
https = 'https'
host = 'g.cn'
path = '/'
test_items = [
('http://g.cn', (http, host, 80, path)),
('http://g.cn/', (http, host, 80, path)),
('http://g.cn:90', (http, host, 90, path)),
('http://g.cn:90/', (http, host, 90, path)),
#
('https://g.cn', (https, host, 443, path)),
('https://g.cn:233/', (https, host, 233, path)),
]
for t in test_items:
url, expected = t
u = parsed_url(url)
e = 'parsed_url ERROR, ({}) ({}) ({})'.format(url, u, expected)
assert u == expected, e
def test_parsed_response():
"""
侧与是否能正确解析响应
:return: status_code, headers, body
"""
response = 'HTTP/1.1 301 Moved Permanently\r\n' \
'Content-Type: text/html\r\n' \
'Location: https://movie.douban.com/top250\r\n' \
'Content-Length: 178\r\n\r\n' \
'test body'
status_code, headers, body = parsed_response(response)
assert status_code == 301
assert len(list(headers.keys())) == 3
assert body == 'test body'
def test_get():
"""
测试是否能正确处理 HTTP 和 HTTPS
:return:
"""
urls = [
'http://movie.douban.com/top250',
'https://movie.douban.com/top250',
]
# 这里就直接调用了 get 如果出错就会挂, 测试得比较简单
for u in urls:
get(u)
def test():
test_parsed_url()
test_parsed_response()
test_get()
if __name__ == '__main__':
test()
# main()