-
Notifications
You must be signed in to change notification settings - Fork 205
/
javdb.py
339 lines (298 loc) · 14.6 KB
/
javdb.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
"""从JavDB抓取数据"""
import os
import re
import sys
import logging
sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..')))
from web.base import Request, resp2html
from web.exceptions import *
from core.func import *
from core.avid import guess_av_type
from core.config import cfg
from core.datatype import MovieInfo, GenreMap
from core.chromium import get_browsers_cookies
# 初始化Request实例。使用scraper绕过CloudFlare后,需要指定网页语言,否则可能会返回其他语言网页,影响解析
request = Request(use_scraper=True)
request.headers['Accept-Language'] = 'zh-CN,zh;q=0.9,zh-TW;q=0.8,en-US;q=0.7,en;q=0.6,ja;q=0.5'
logger = logging.getLogger(__name__)
genre_map = GenreMap('data/genre_javdb.csv')
permanent_url = 'https://javdb.com'
if cfg.Network.proxy:
base_url = permanent_url
else:
base_url = cfg.ProxyFree.javdb
def get_html_wrapper(url):
"""包装外发的request请求并负责转换为可xpath的html,同时处理Cookies无效等问题"""
global request, cookies_pool
r = request.get(url, delay_raise=True)
if r.status_code == 200:
# 发生重定向可能仅仅是域名重定向,因此还要检查url以判断是否被跳转到了登录页
if r.history and '/login' in r.url:
# 仅在需要时去读取Cookies
if 'cookies_pool' not in globals():
try:
cookies_pool = get_browsers_cookies()
except (PermissionError, OSError) as e:
logger.warning(f"无法从浏览器Cookies文件获取JavDB的登录凭据({e}),可能是安全软件在保护浏览器Cookies文件", exc_info=True)
cookies_pool = []
except Exception as e:
logger.warning(f"获取JavDB的登录凭据时出错({e}),你可能使用的是国内定制版等非官方Chrome系浏览器", exc_info=True)
cookies_pool = []
if len(cookies_pool) > 0:
item = cookies_pool.pop()
# 更换Cookies时需要创建新的request实例,否则cloudscraper会保留它内部第一次发起网络访问时获得的Cookies
request = Request(use_scraper=True)
request.cookies = item['cookies']
cookies_source = (item['profile'], item['site'])
logger.debug(f'未携带有效Cookies而发生重定向,尝试更换Cookies为: {cookies_source}')
return get_html_wrapper(url)
else:
raise CredentialError('JavDB: 所有浏览器Cookies均已过期')
elif r.history and 'pay' in r.url.split('/')[-1]:
raise SitePermissionError(f"JavDB: 此资源被限制为仅VIP可见: '{r.history[0].url}'")
else:
html = resp2html(r)
return html
elif r.status_code in (403, 503):
html = resp2html(r)
code_tag = html.xpath("//span[@class='code-label']/span")
error_code = code_tag[0].text if code_tag else None
if error_code:
if error_code == '1020':
block_msg = f'JavDB: {r.status_code} 禁止访问: 站点屏蔽了来自日本地区的IP地址,请使用其他地区的代理服务器'
else:
block_msg = f'JavDB: {r.status_code} 禁止访问: {url} (Error code: {error_code})'
else:
block_msg = f'JavDB: {r.status_code} 禁止访问: {url}'
raise SiteBlocked(block_msg)
else:
raise WebsiteError(f'JavDB: {r.status_code} 非预期状态码: {url}')
def get_user_info(site, cookies):
"""获取cookies对应的JavDB用户信息"""
try:
request.cookies = cookies
html = request.get_html(f'https://{site}/users/profile')
except Exception as e:
logger.info('JavDB: 获取用户信息时出错')
logger.debug(e, exc_info=1)
return
# 扫描浏览器得到的Cookies对应的临时域名可能会过期,因此需要先判断域名是否仍然指向JavDB的站点
if 'JavDB' in html.text:
email = html.xpath("//div[@class='user-profile']/ul/li[1]/span/following-sibling::text()")[0].strip()
username = html.xpath("//div[@class='user-profile']/ul/li[2]/span/following-sibling::text()")[0].strip()
return email, username
else:
logger.debug('JavDB: 域名已过期: ' + site)
def get_valid_cookies():
"""扫描浏览器,获取一个可用的Cookies"""
# 经测试,Cookies所发往的域名不需要和登录时的域名保持一致,只要Cookies有效即可在多个域名间使用
for d in cookies_pool:
info = get_user_info(d['site'], d['cookies'])
if info:
return d['cookies']
else:
logger.debug(f"{d['profile']}, {d['site']}: Cookies无效")
def parse_data(movie: MovieInfo):
"""从网页抓取并解析指定番号的数据
Args:
movie (MovieInfo): 要解析的影片信息,解析后的信息直接更新到此变量内
"""
# JavDB搜索番号时会有多个搜索结果,从中查找匹配番号的那个
html = get_html_wrapper(f'{base_url}/search?q={movie.dvdid}')
ids = list(map(str.lower, html.xpath("//div[@class='video-title']/strong/text()")))
movie_urls = html.xpath("//a[@class='box']/@href")
match_count = len([i for i in ids if i == movie.dvdid.lower()])
if match_count == 0:
raise MovieNotFoundError(__name__, movie.dvdid, ids)
elif match_count == 1:
index = ids.index(movie.dvdid.lower())
new_url = movie_urls[index]
try:
html2 = get_html_wrapper(new_url)
except (SitePermissionError, CredentialError):
# 不开VIP不让看,过分。决定榨出能获得的信息,毕竟有时候只有这里能找到标题和封面
box = html.xpath("//a[@class='box']")[index]
movie.url = new_url
movie.title = box.get('title')
movie.cover = box.xpath("div/img/@src")[0]
score_str = box.xpath("div[@class='score']/span/span")[0].tail
score = re.search(r'([\d.]+)分', score_str).group(1)
movie.score = "{:.2f}".format(float(score)*2)
movie.publish_date = box.xpath("div[@class='meta']/text()")[0].strip()
return
else:
raise MovieDuplicateError(__name__, movie.dvdid, match_count)
container = html2.xpath("/html/body/section/div/div[@class='video-detail']")[0]
info = container.xpath("//nav[@class='panel movie-panel-info']")[0]
title = container.xpath("h2/strong[@class='current-title']/text()")[0]
show_orig_title = container.xpath("//a[contains(@class, 'meta-link') and not(contains(@style, 'display: none'))]")
if show_orig_title:
movie.ori_title = container.xpath("h2/span[@class='origin-title']/text()")[0]
cover = container.xpath("//img[@class='video-cover']/@src")[0]
preview_pics = container.xpath("//a[@class='tile-item'][@data-fancybox='gallery']/@href")
preview_video_tag = container.xpath("//video[@id='preview-video']/source/@src")
if preview_video_tag:
preview_video = preview_video_tag[0]
if preview_video.startswith('//'):
preview_video = 'https:' + preview_video
movie.preview_video = preview_video
dvdid = info.xpath("div/span")[0].text_content()
publish_date = info.xpath("div/strong[text()='日期:']")[0].getnext().text
duration = info.xpath("div/strong[text()='時長:']")[0].getnext().text.replace('分鍾', '').strip()
director_tag = info.xpath("div/strong[text()='導演:']")
if director_tag:
movie.director = director_tag[0].getnext().text_content().strip()
av_type = guess_av_type(movie.dvdid)
if av_type != 'fc2':
producer_tag = info.xpath("div/strong[text()='片商:']")
else:
producer_tag = info.xpath("div/strong[text()='賣家:']")
if producer_tag:
movie.producer = producer_tag[0].getnext().text_content().strip()
publisher_tag = info.xpath("div/strong[text()='發行:']")
if publisher_tag:
movie.publisher = publisher_tag[0].getnext().text_content().strip()
serial_tag = info.xpath("div/strong[text()='系列:']")
if serial_tag:
movie.serial = serial_tag[0].getnext().text_content().strip()
score_tag = info.xpath("//span[@class='score-stars']")
if score_tag:
score_str = score_tag[0].tail
score = re.search(r'([\d.]+)分', score_str).group(1)
movie.score = "{:.2f}".format(float(score)*2)
genre_tags = info.xpath("//strong[text()='類別:']/../span/a")
genre, genre_id = [], []
for tag in genre_tags:
pre_id = tag.get('href').split('/')[-1]
genre.append(tag.text)
genre_id.append(pre_id)
# 判定影片有码/无码
subsite = pre_id.split('?')[0]
movie.uncensored = {'uncensored': True, 'tags':False}.get(subsite)
# JavDB目前同时提供男女优信息,根据用来标识性别的符号筛选出女优
actors_tag = info.xpath("//strong[text()='演員:']/../span")[0]
all_actors = actors_tag.xpath("a/text()")
genders = actors_tag.xpath("strong/text()")
actress = [i for i in all_actors if genders[all_actors.index(i)] == '♀']
magnet = container.xpath("//div[@class='magnet-name column is-four-fifths']/a/@href")
movie.dvdid = dvdid
movie.url = new_url.replace(base_url, permanent_url)
movie.title = title.replace(dvdid, '').strip()
movie.cover = cover
movie.preview_pics = preview_pics
movie.publish_date = publish_date
movie.duration = duration
movie.genre = genre
movie.genre_id = genre_id
movie.actress = actress
movie.magnet = [i.replace('[javdb.com]','') for i in magnet]
def parse_clean_data(movie: MovieInfo):
"""解析指定番号的影片数据并进行清洗"""
try:
parse_data(movie)
# 检查封面URL是否真的存在对应图片
if movie.cover is not None:
r = request.head(movie.cover)
if r.status_code != 200:
movie.cover = None
except SiteBlocked:
raise
logger.error('JavDB: 可能触发了反爬虫机制,请稍后再试')
if movie.genre_id and (not movie.genre_id[0].startswith('fc2?')):
movie.genre_norm = genre_map.map(movie.genre_id)
movie.genre_id = None # 没有别的地方需要再用到,清空genre id(表明已经完成转换)
def collect_actress_alias(type=0, use_original=True):
"""
收集女优的别名
type: 0-有码, 1-无码, 2-欧美
use_original: 是否使用原名而非译名,True-田中レモン,False-田中檸檬
"""
import json
import time
import random
actressAliasMap = {}
actressAliasFilePath = "data/actress_alias.json"
# 检查文件是否存在
if not os.path.exists(actressAliasFilePath):
# 如果文件不存在,创建文件并写入空字典
with open(actressAliasFilePath, "w", encoding="utf-8") as file:
json.dump({}, file)
typeList = ["censored", "uncensored", "western"]
page_url = f"{base_url}/actors/{typeList[type]}"
while True:
try:
html = get_html_wrapper(page_url)
actors = html.xpath("//div[@class='box actor-box']/a")
count = 0
for actor in actors:
count += 1
actor_name = actor.xpath("strong/text()")[0].strip()
actor_url = actor.xpath("@href")[0]
# actor_url = f"https://javdb.com{actor_url}" # 构造演员主页的完整URL
# 进入演员主页,获取更多信息
actor_html = get_html_wrapper(actor_url)
# 解析演员所有名字信息
names_span = actor_html.xpath("//span[@class='actor-section-name']")[0]
aliases_span_list = actor_html.xpath("//span[@class='section-meta']")
aliases_span = aliases_span_list[0]
names_list = [name.strip() for name in names_span.text.split(",")]
if len(aliases_span_list) > 1:
aliases_list = [
alias.strip() for alias in aliases_span.text.split(",")
]
else:
aliases_list = []
# 将信息添加到actressAliasMap中
actressAliasMap[names_list[-1 if use_original else 0]] = (
names_list + aliases_list
)
print(
f"{count} --- {names_list[-1 if use_original else 0]}: {names_list + aliases_list}"
)
if count == 10:
# 将数据写回文件
with open(actressAliasFilePath, "r", encoding="utf-8") as file:
existing_data = json.load(file)
# 合并现有数据和新爬取的数据
existing_data.update(actressAliasMap)
# 将合并后的数据写回文件
with open(actressAliasFilePath, "w", encoding="utf-8") as file:
json.dump(existing_data, file, ensure_ascii=False, indent=2)
actressAliasMap = {} # 重置actressAliasMap
print(
f"已爬取 {count} 个女优,数据已更新并写回文件:",
actressAliasFilePath,
)
# 重置计数器
count = 0
time.sleep(max(1, 10 * random.random())) # 随机等待 1-10 秒
# 判断是否有下一页按钮
next_page_link = html.xpath(
"//a[@rel='next' and @class='pagination-next']/@href"
)
if not next_page_link:
break # 没有下一页,结束循环
else:
next_page_url = f"{next_page_link[0]}"
page_url = next_page_url
except SiteBlocked:
raise
with open(actressAliasFilePath, "r", encoding="utf-8") as file:
existing_data = json.load(file)
# 合并现有数据和新爬取的数据
existing_data.update(actressAliasMap)
# 将合并后的数据写回文件
with open(actressAliasFilePath, "w", encoding="utf-8") as file:
json.dump(existing_data, file, ensure_ascii=False, indent=2)
print(f"已爬取 {count} 个女优,数据已更新并写回文件:", actressAliasFilePath)
if __name__ == "__main__":
import pretty_errors
pretty_errors.configure(display_link=True)
logger.root.handlers[1].level = logging.DEBUG
# collect_actress_alias()
movie = MovieInfo('FC2-2735981')
try:
parse_clean_data(movie)
print(movie)
except CrawlerError as e:
logger.error(e, exc_info=1)