In [1]:
import pandas as pd
# ^^^ pyforest auto-imports - don't write above this line
from functools import reduce
from hashlib import md5
import urllib.parse
import time
import requests
import csv
import numpy as np
import json
from datetime import datetime
from retrying import retry

In [2]:
mixinKeyEncTab = [
    46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49,
    33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40,
    61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11,
    36, 20, 34, 44, 52
]

In [3]:
def getMixinKey(orig: str):
    '对 imgKey 和 subKey 进行字符顺序打乱编码'
    return reduce(lambda s, i: s + orig[i], mixinKeyEncTab, '')[:32]

def encWbi(params: dict, img_key: str, sub_key: str):
    '为请求参数进行 wbi 签名'
    mixin_key = getMixinKey(img_key + sub_key)
    curr_time = round(time.time())
    params['wts'] = curr_time                                   # 添加 wts 字段
    params = dict(sorted(params.items()))                       # 按照 key 重排参数
    # 过滤 value 中的 "!'()*" 字符
    params = {
        k : ''.join(filter(lambda chr: chr not in "!'()*", str(v)))
        for k, v 
        in params.items()
    }
    query = urllib.parse.urlencode(params)                      # 序列化参数
    wbi_sign = md5((query + mixin_key).encode()).hexdigest()    # 计算 w_rid
    params['w_rid'] = wbi_sign
    return params

def getWbiKeys() -> tuple[str, str]:
    '获取最新的 img_key 和 sub_key'
    resp = requests.get('https://api.bilibili.com/x/web-interface/nav')
    resp.raise_for_status()
    json_content = resp.json()
    img_url: str = json_content['data']['wbi_img']['img_url']
    sub_url: str = json_content['data']['wbi_img']['sub_url']
    img_key = img_url.rsplit('/', 1)[1].split('.')[0]
    sub_key = sub_url.rsplit('/', 1)[1].split('.')[0]
    return img_key, sub_key

In [4]:
def get_url(uid,pn):
    img_key, sub_key = getWbiKeys()
    signed_params = encWbi(
        params={
            'mid': uid,
            'ps': 50,
            'tid': 0,
            'pn': pn,
            'keyword': '',
            'order': 'pubdate',
            'platform': 'web',
            'web_location': '1550101',
            'order_avoided':'true'
        },
        img_key=img_key,
        sub_key=sub_key
    )
    query = urllib.parse.urlencode(signed_params)
    url = 'https://api.bilibili.com/x/space/wbi/arc/search?'+ query
    return url

In [5]:
def save_file(data):
    with open('f_video_list.json', 'a',encoding='utf-8') as f:
        for info in data:
            f.write(info + ",")

In [6]:
@retry(stop_max_attempt_number=10,wait_fixed=2000)
def get_videos_list(mid):       
    #首先进入第一页拿到该up主的页数        
    pn = 1
    url = get_url(mid,pn)
    header = # your header
    response = requests.get(url=url, headers=header).json()
    if 'data' not in response or 'list' not in response['data']:
        print('Invalid response format')
        return
    videos_count = response['data']['page']['count']
    pages =int(videos_count/50) +1
    #存储第一页视频
    videoslist = response['data']['list']['vlist']
    video_lst = []
    for item in videoslist:
        video_lst.append(json.dumps(item, ensure_ascii=False))
    #分情况保存：第一页，最后一页，其余页
    if mid == up_list[0]:
        #先保存第一页内容
        with open('f_video_list.json', 'a',encoding='utf-8') as f:
                f.write("["+ video_lst[0] +",")
                video_lst.remove(video_lst[0])
        save_file(video_lst)
        #接下来保存其他页数
        pn = 2
        while pn <= pages:
            url = get_url(mid,pn)
            response = requests.get(url=url, headers=header).json()
            videoslist = response['data']['list']['vlist']
            video_lst = []
            for item in videoslist:
                video_lst.append(json.dumps(item, ensure_ascii=False))
            save_file(video_lst)
            pn += 1
            time.sleep(3)
    elif mid == up_list[len(up_list)-1]:
        if pages == 1:
            index_last = len(video_lst)-1
            last_elem = video_lst[index_last]
            video_lst.remove(last_elem)
            save_file(video_lst)
            with open('f_video_list.json','a',encoding='utf-8') as f:
                    f.write(last_elem +"]")
        else:
            save_file(video_lst)
            pn = 2
            while pn <= pages:
                if pn == pages:
                    url = get_url(mid,pn)
                    response = requests.get(url=url, headers=header).json()
                    videoslist = response['data']['list']['vlist']
                    video_lst = []
                    for item in videoslist:
                        video_lst.append(json.dumps(item, ensure_ascii=False))
                    index_last = len(video_lst)-1
                    last_elem = video_lst[index_last]
                    video_lst.remove(last_elem)
                    save_file(video_lst)
                    with open('f_video_list.json','a',encoding='utf-8') as f:
                            f.write(last_elem +"]")
                else:     
                    url = get_url(mid,pn)
                    response = requests.get(url=url, headers=header).json()
                    videoslist = response['data']['list']['vlist']
                    video_lst = []
                    for item in videoslist:
                        video_lst.append(json.dumps(item, ensure_ascii=False))
                    save_file(video_lst)
                    time.sleep(3)
                pn += 1
    else:
        save_file(video_lst)
        pn = 2
        while pn <= pages:
            url = get_url(mid,pn)
            response = requests.get(url=url, headers=header).json()
            videoslist = response['data']['list']['vlist']
            video_lst = []
            for item in videoslist:
                video_lst.append(json.dumps(item, ensure_ascii=False))
            save_file(video_lst)
            pn += 1
            time.sleep(3)

In [7]:
# 导入源文件中合作的up_uid
np.load.__defaults__=(None, True, True, 'ASCII')
up_list = np.load('up_list.npy').tolist()
np.load.__defaults__=(None, False, True, 'ASCII') 

In [None]:
n = 0
for i in up_list:
    get_videos_list(i)
    n += 1
    print(n)

In [5]:
with open('f_video_list.json','r',encoding='utf-8') as f:
    json_data = json.load(f)

In [6]:
csv_header = ['bvid', 'mid', 'description', 'author', 'pic', 'created', 'play', 'length', 'title','copyright', 'is_pay', 'hide_click', 'is_charging_arc', 'aid', 'meta', 'vt', 'is_live_playback', 'video_review', 'is_steins_gate', 'enable_vt', 'comment', 'attribute', 'is_union_video', 'is_avoided', 'subtitle', 'review', 'typeid']
with open('T_video_list_csv.csv', 'w', encoding='utf-8', newline='') as f:
    dictWriter = csv.DictWriter(f, csv_header)
    dictWriter.writeheader()
    dictWriter.writerows(json_data)

In [7]:
df = pd.read_csv('T_video_list_csv.csv', dtype={'play': str,'title': str})

In [8]:
df = df.drop(columns=['pic','description','subtitle','title'])

In [9]:
df.to_csv("new_video_list.csv",index=False,encoding="utf-8")