-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.py
391 lines (335 loc) · 13.1 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
from abc import ABCMeta, abstractmethod
import requests
from bs4 import BeautifulSoup
import time
import webbrowser
class Search(metaclass=ABCMeta):
def __init__(self):
self.url = ""
self.args = ""
self.bd_session = requests.Session()
self.report = None
self.bs4: BeautifulSoup = None
self.word_list = []
self.url_dict = {}
self.page_num = 0
self.referer = ""
self.headers = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'max-age=0',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'connection': 'close',
'upgrade-insecure-requests': '1',
'accept-encoding': 'gzip, deflate',
"content-type": "application/x-www-form-urlencoded",
"Upgrade-Insecure-Requests": "1",
'user-agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36 QIHU 360SE',
}
@abstractmethod
def get_report(self, args_list, start):
pass
def bs_paser(self) -> None:
assert self.report, "Don't get report"
self.bs4 = BeautifulSoup(self.report, 'html.parser')
@abstractmethod
def find_word(self):
pass
@abstractmethod
def __iter__(self):
pass
@abstractmethod
def __next__(self):
pass
def output_word(self):
return self.word_list
def return_page(self):
return self.page_num
class BingWeb(Search):
def __init__(self):
super().__init__()
self.url = "https://cn.bing.com"
self.headers["Origin"] = "https://cn.bing.com"
self.headers['host'] = 'cn.bing.com'
def get_report(self, args_list=None, start=True):
if args_list:
self.args = "?" + "q=" + args_list
if start:
self.page_num = 0
if self.referer:
self.headers["referer"] = self.referer
self.referer = self.url + self.args
self.report = self.bd_session.get(self.referer, headers=self.headers).text
self.bs_paser()
return self
def find_word(self) -> None:
self.word_list = []
# bing 特色搜索
word = self.bs4.find_all("li", class_="b_ans") # bing 词典(dict_oa), bing 视频(vsa)
for w in word:
dict_oa = w.find("div", class_="dict_oa")
vsa = w.find("div", class_="vsa") # bing 视频
try: # 错误捕捉
if dict_oa: # 找到了dict_oa,是词典模式
self.append_word_list("[bing词典]" + dict_oa.div.div.h2.a.text,
self.url + dict_oa.div.div.h2.a.get("href"))
elif vsa: # 视频模式
self.append_word_list("[bing视频]" + vsa.h2.a.text,
self.url + vsa.h2.a.get("href"))
pass
except AttributeError:
pass
word = self.bs4.find_all("li", class_="b_ans b_mop b_imgans b_imgsmall") # bing 图片
for w in word:
irphead = w.find("div", class_="irphead")
try: # 错误捕捉
if irphead: # 找到了dict_oa,是词典模式
self.append_word_list("[bing图片]" + irphead.h2.a.text,
self.url + irphead.h2.a.get("href"))
except AttributeError:
pass
word = self.bs4.find_all("li", class_="b_algo") # b_algo是普通词条或者官网(通过b_title鉴别)
for w in word:
title = w.find("div", class_="b_title")
try: # 错误捕捉
if title: # 找到了title(官网模式)
self.append_word_list(title.h2.a.text, title.h2.a.get("href"))
else: # 普通词条模式
self.append_word_list(w.h2.a.text, w.h2.a.get("href"))
except AttributeError:
pass
def append_word_list(self, title, url): # 过滤重复并且压入url_list
if not self.url_dict.get(url, None):
self.url_dict[url] = title
self.word_list.append((title, url))
def __iter__(self):
self.page_num = -1
return self
def __next__(self) -> bool:
if self.page_num == -1: # 默认的第一次get
self.page_num += 1
return True
self.page_num += 1
title = self.bs4.find("a", title=f"下一页")
if title:
self.args = title.get("href")
self.report = self.get_report(None, False)
else:
raise StopIteration
return True
class BaiduWeb(Search):
def __init__(self):
super().__init__()
self.url = "https://www.baidu.com"
self.headers["Origin"] = "https://www.baidu.com"
self.headers['host'] = 'www.baidu.com'
def get_report(self, args_list=None, start=True):
if args_list:
self.args = "/s?" + "wd=" + args_list
if start:
self.page_num = 0
if self.referer:
self.headers["referer"] = self.referer
self.referer = self.url + self.args
self.report = self.bd_session.get(self.referer, headers=self.headers).text
self.bs_paser()
return self
def find_word(self) -> None:
self.word_list = []
# 百度特色搜索
word = self.bs4.find_all("div", class_="result-op c-container xpath-log", tpl="bk_polysemy") # 百度百科
for w in word:
try: # 错误捕捉
self.append_word_list("[百度百科]" + str(w.h3.a.text).replace("\n", ""), w.h3.a.get("href"))
except AttributeError:
pass
word = self.bs4.find_all("div", class_="result c-container")
for w in word:
try: # 错误捕捉
self.append_word_list(w.h3.a.text, w.h3.a.get("href"))
except AttributeError:
pass
word = self.bs4.find_all("div", class_="result-op c-container") # 特殊词条
for w in word:
# c-result-content
try: # 错误捕捉
title = w.find("div", class_="c-result-content").find("section") # 特殊词条
self.append_word_list(title.a.h3.span.text, title.a.get("href"))
except AttributeError:
pass
def append_word_list(self, title, url): # 过滤重复并且压入url_list
# try:
# new_url = requests.get(url, headers=self.headers, timeout=5)
# print(new_url.status_code) # 打印响应的状态码
# url = new_url.url
# print(url)
# except:
# pass
if not self.url_dict.get(url, None):
self.url_dict[url] = title
self.word_list.append((title, url))
def __iter__(self):
self.page_num = -1
return self
def __next__(self) -> bool:
if self.page_num == -1: # 默认的第一次get
self.page_num += 1
return True
self.page_num += 1
page = self.bs4.find("div", id="page")
if not page:
raise StopIteration
next_page_list = self.bs4.find_all("a", class_=f"n")
if next_page_list:
next_page = next_page_list[-1]
if not str(next_page.text).startswith("下一页"):
raise StopIteration
self.args = next_page.get("href")
self.report = self.get_report(None, False)
else:
raise StopIteration
return True
class Seacher: # 搜索者
def __init__(self, word: str):
self.web = {"bing": BingWeb(), "baidu": BaiduWeb()}
self.word = word
self.first = True
self.old_return_str = ""
self.web_name_dict = {} # 同名网站处理
self.url_list = []
def find(self):
for web_name in self.web:
web = self.web[web_name]
web.get_report(self.word).__iter__() # 做好迭代的准备
return self
def __iter__(self):
self.first = True
return self
def __next__(self):
if not self.first:
time.sleep(1)
# 使用了menu之后不需要is_next了
# if not self.is_next():
# raise StopIteration
else:
self.first = False
return_str = ""
for web_name in self.web:
web = self.web[web_name]
try:
web.__next__()
except StopIteration:
pass
else:
web.find_word()
get: list = web.output_word()
return_str += "\n" + "* " * 20 + f"\n{web.return_page()}: [{web_name}] for {self.word} >>>\n"
for i in get:
if self.web_name_dict.get(i[0], None):
return_str += f"[{len(self.url_list)}][曾经出现过 {self.web_name_dict[i[0]]}] {i[0]}\n{' ' * 8}-> {i[1]}\n"
self.url_list.append(i[1])
else:
return_str += f"[{len(self.url_list)}]{i[0]}\n{' ' * 8}-> {i[1]}\n"
self.web_name_dict[i[0]] = f"{web_name}, page: {web.return_page()}, [{len(self.url_list)}]"
self.url_list.append(i[1])
return_str += "* " * 20 + "\n"
self.old_return_str = return_str
return return_str
def out_again(self): # 再输出一次
return self.old_return_str
def open_url(self, num) -> None: # 再输出一次
try:
url = self.url_list[num]
except IndexError: # 太大了
return None
webbrowser.open_new_tab(url)
time.sleep(3)
@staticmethod
def is_next():
return input("next? [Y/n]") != "n"
class Menu:
def __init__(self):
self.searcher_dict = {}
self.searcher_dict_old = {}
print("Welcome To SSearch!")
def menu(self) -> None:
while True:
try:
if not self.__menu():
break
except KeyboardInterrupt:
print("\n", end="")
except BaseException as e:
print(f"There are some Error:\n{e}\n")
def __menu(self): # 注: self是有作用的(exec)
try:
command = input(f'[\033[4mSSearch\033[0m] > ') # 输入一条指令
except KeyboardInterrupt:
print("\nPlease Enter 'quit' or 'q' to quit")
return True
if command == "q" or command == "quit":
print("SSearch: Bye Bye!")
return False # 结束
try:
exec(f"self.func_{command}()")
except AttributeError:
print("Not Support Command. [help]")
return True
def func_make(self):
word = input("输入关键词:")
name = input(f"输入名字[默认={word}]:")
if not name:
name = word
self.searcher_dict[name] = Seacher(word) # 制造一个搜索器
self.searcher_dict[name].find().__iter__() # 迭代准备
self.func_next(name, True)
def func_again(self, name=None):
if not name:
name = input(f"输入名字:")
seacher_iter = self.searcher_dict.get(name, None)
if not seacher_iter:
print("没有找到对应搜索器或搜索器已经搜索结束")
else:
print(seacher_iter.out_again())
def func_open(self):
name = input(f"输入名字:")
try:
num = int(input("输入代号:"))
except ValueError:
print("请输入数字代号")
return
seacher_iter = self.searcher_dict.get(name, None)
seacher_iter_old = self.searcher_dict_old.get(name, None)
if seacher_iter:
seacher_iter.open_url(num)
elif seacher_iter_old:
seacher_iter_old.open_url(num)
else:
print("没有找到对应搜索器或搜索器已经搜索结束")
def func_next(self, name=None, first=False):
if not name:
name = input(f"输入名字:")
if not first:
self.func_again(name)
seacher_iter = self.searcher_dict.get(name, None)
if not seacher_iter:
print("没有找到对应搜索器或搜索器已经搜索结束")
else:
try:
if first: # make的时候需要输出
out = seacher_iter.__next__()
print(out)
seacher_iter.__next__() # 储备输出
except StopIteration:
self.func_again(name) # 输出最后的结果
self.searcher_dict_old[name] = self.searcher_dict[name]
del self.searcher_dict[name] # 删除输出
print(f"{name}: [搜索结束]")
except AttributeError as e:
print(f"There are some Error:\n{e}\n")
if __name__ == "__main__":
menu = Menu()
menu.menu()