获取所有 up 的视频信息

In [15]:
from pathlib import Path
from pydantic import BaseModel, TypeAdapter
from typing import List


class Up(BaseModel):
    uuid: str
    content: str
    up_name: str


class Group(BaseModel):
    group_name: str
    up_list: List[Up]


markdown_path = Path() / "data" / "我的up分组.md"

with open(markdown_path, "r", encoding="utf-8") as f:
    markdown_lines = f.readlines()

group_list: List[Group]  = []
group = None
all_ups_uuid_list = []
for line in markdown_lines:
    if line.startswith("#"):
        # 遇到了标题行，说明到了新的组
        if group:
            group_list.append(group)
        group = Group(group_name=line[1:].strip(), up_list=[])
    elif (not group) or (not line.startswith("|")):
        # 普通行，如果在第一个标题行前，或者不是表格，直接跳过
        continue
    else:
        # 表格行
        splits = line.split("|")
        column1 = splits[1].strip()
        if not column1[0:1].isdigit():
            # 不是up信息的行，直接跳过
            continue
        column2 = splits[2].strip()
        column3 = splits[3].strip()
        up = Up(uuid=column1, content=column2, up_name=column3)
        all_ups_uuid_list.append(up.uuid)
        group.up_list.append(up)
group_list.append(group)

解析视频信息

In [16]:
import asyncio
from bilibili_api import video, user
import random


# https://github.com/Nemo2011/bilibili-api?tab=readme-ov-file
class VideoInfo(BaseModel):
    title: str
    pic: str
    bvid: str

async def get_videos(uuid: str):
    u = user.User(uuid)
    videos = await u.get_videos()
    return {
        uuid: [
            VideoInfo(title=v["title"], pic=v["pic"], bvid=v["bvid"])
            for v in videos["list"]["vlist"]
            if v["elec_arc_type"] == 0
        ],
    }

tasks = [get_videos(uuid) for uuid in all_ups_uuid_list]

# 间隔，分批请求
uuid_to_videos = {}

for i in range(0, len(tasks), 10):
    print(i)
    batch = tasks[i : i + 10]
    batch_result = await asyncio.gather(*batch)
    for each_result in batch_result:
        uuid_to_videos.update(each_result)
    if i + 15 < len(tasks):
        await asyncio.sleep(random.uniform(1, 2))

0
10
20
30
40
50
60


In [17]:
from datetime import datetime

# 构建输出的内容
now = datetime.now()
dir_path = Path() / "output"
snapshot_dir_path = dir_path / "snapshot"
# markdown 用来看
output_path = dir_path / (now.strftime("%Y-%m-%d-%H-%M-%S") + ".md")
# 数据，用来做减法
snapshot_path = snapshot_dir_path / (str(int(now.timestamp())) + ".snapshot")

# 获取 snapshot_dir_path 下所有以 .snapshot 结尾的文件
snapshot_files = [
    f
    for f in snapshot_dir_path.iterdir()
    if f.suffix == ".snapshot" and f.stem.isdigit()
]
if snapshot_files:
    # 按照文件名数字排序，取最大的
    latest_snapshot = max(snapshot_files, key=lambda f: int(f.stem))
else:
    latest_snapshot = None

根据 之前的 snapshot 和 这次的视频数据，构造完整的 snapshot

原因：

有的 uuid 获取到的视频信息可能为空，但是之前有获取到


In [18]:
# 读取之前的 snapshot 信息
import json

if latest_snapshot:
    with open(latest_snapshot, "r", encoding="utf-8") as f:
        latest_snapshot_data = f.read()
    # 反序列化 uuid_to_videos_json 为 Python 对象
    latest_uuid_to_latest_video = json.loads(latest_snapshot_data)
    uuid_to_latest_video = {
        uuid: VideoInfo(**last_video_info)
        for uuid, last_video_info in latest_uuid_to_latest_video.items()
    }

# 本次的信息构建 snapshot
snapshot_data = {uuid: videos[0] for uuid, videos in uuid_to_videos.items() if videos}

# 合并
if latest_snapshot:
    for uuid, latest_video in uuid_to_latest_video.items():
        if uuid not in snapshot_data:
            snapshot_data[uuid] = latest_video

# 写 snapshot
uuid_to_video_json = json.dumps(
    {uuid: video.model_dump() for uuid, video in snapshot_data.items()},
    ensure_ascii=False,
)
with open(snapshot_path, "w", encoding="utf-8") as f:
    f.write(uuid_to_video_json)

In [19]:
# 读取之前的 snapshot
if latest_snapshot:
    # 去掉 uuid_to_videos 中，latest_video 及其之后的部分
    for uuid, videos in uuid_to_videos.items():
        if uuid in uuid_to_latest_video:
            old_last_bvid = uuid_to_latest_video[uuid].bvid
            idx = next(
                (i for i, v in enumerate(videos) if v.bvid == old_last_bvid), None
            )
            updates = videos[:idx]
            uuid_to_videos[uuid] = updates

# 去掉 value 为空的
uuid_to_videos = {k: v for k, v in uuid_to_videos.items() if v}

In [20]:
len(uuid_to_videos)

3

In [21]:
if uuid_to_videos:
    # 说明有更新，写 更新 markdown
    output_content = ""

    for group in group_list:
        has_write_title = False
        for up in group.up_list:
            videoInfos = uuid_to_videos.get(up.uuid)
            if videoInfos:
                if not has_write_title:
                    output_content += "# "
                    output_content += group.group_name
                    output_content += "\n\n"
                    has_write_title = True

                output_content += "## "
                output_content += up.up_name
                output_content += "    "
                output_content += up.content
                output_content += "\n\n"
                for videoInfo in videoInfos:
                    output_content += videoInfo.title
                    output_content += "\n"
                    output_content += "https://www.bilibili.com/video/" + videoInfo.bvid
                    output_content += "\n\n"
    with open(output_path, "w", encoding="utf-8") as f:
        f.write(output_content)
else:
    print("没有更新！")