In [None]:
import http.server as SimpleHTTPServer
import os
import re
import sys

def copy_byte_range(infile, outfile, start=None, stop=None, bufsize=16*1024):
    if start is not None: infile.seek(start)
    while 1:
        to_read = min(bufsize, stop + 1 - infile.tell() if stop else bufsize)
        if buf := infile.read(to_read):
            outfile.write(buf)
        else:
            break


BYTE_RANGE_RE = re.compile(r'bytes=(\d+)-(\d+)?$')
def parse_byte_range(byte_range):
    if byte_range.strip() == '':
        return None, None

    m = BYTE_RANGE_RE.match(byte_range)
    if not m:
        raise ValueError(f'Invalid byte range {byte_range}')

    first, last = [x and int(x) for x in m.groups()]
    if last and last < first:
        raise ValueError(f'Invalid byte range {byte_range}')
    return first, last


class RangeRequestHandler(SimpleHTTPServer.SimpleHTTPRequestHandler):
    def send_head(self):
        if 'Range' not in self.headers:
            self.range = None
            return SimpleHTTPServer.SimpleHTTPRequestHandler.send_head(self)
        try:
            self.range = parse_byte_range(self.headers['Range'])
        except ValueError as e:
            self.send_error(400, 'Invalid byte range')
            return None
        first, last = self.range

        # Mirroring SimpleHTTPServer.py here
        path = self.translate_path(self.path)
        f = None
        ctype = self.guess_type(path)
        try:
            f = open(path, 'rb')
        except IOError:
            self.send_error(404, 'File not found')
            return None

        fs = os.fstat(f.fileno())
        file_len = fs[6]
        if first >= file_len:
            self.send_error(416, 'Requested Range Not Satisfiable')
            return None

        self.send_response(206)
        self.send_header('Content-type', ctype)

        if last is None or last >= file_len:
            last = file_len - 1
        response_length = last - first + 1

        self.send_header('Content-Range', f'bytes {first}-{last}/{file_len}')
        self.send_header('Content-Length', str(response_length))
        self.send_header('Last-Modified', self.date_time_string(fs.st_mtime))
        self.end_headers()
        return f

    def end_headers(self):
        self.send_header('Accept-Ranges', 'bytes')
        return SimpleHTTPServer.SimpleHTTPRequestHandler.end_headers(self)

    def copyfile(self, source, outputfile):
        if not self.range:
            return SimpleHTTPServer.SimpleHTTPRequestHandler.copyfile(self, source, outputfile)

        start, stop = self.range  # set in send_head()
        copy_byte_range(source, outputfile, start, stop)


def main():
    port = 8000

    # Manually parse command line arguments
    for i in range(1, len(sys.argv)):
        arg = sys.argv[i]
        if arg.startswith('--port='):
            port = int(arg[len('--port='):])
        elif arg == '--port':
            if i + 1 < len(sys.argv):
                port = int(sys.argv[i+1])

    SimpleHTTPServer.test(HandlerClass=RangeRequestHandler, port=port)


if __name__ == '__main__':
    main()


In [None]:
import qrcode
import requests
import math
import time
import os
import re
import sys
import html
import json
import math
import time
import threading
import subprocess
import xml.etree.ElementTree as ET
from datetime import datetime, timedelta, timezone
import http.cookiejar
import urllib

# 文件保存模块
def file_save(content, file_name, folder=None):
    # 如果指定了文件夹则将文件保存到指定的文件夹中
    if folder:
        file_path = os.path.join(os.path.join(os.getcwd(), folder), file_name)
    else:
        # 如果没有指定文件夹则将文件保存在当前工作目录中
        file_path = os.path.join(os.getcwd(), file_name)
    # 保存文件
    with open(file_path, "w", encoding="utf-8") as file:
        file.write(content)

#日志模块
def write_log(log, suffix = None, display = True, time_display = True):
    # 获取当前的具体时间
    current_time = datetime.now()
    # 格式化输出, 只保留年月日时分秒
    formatted_time = current_time.strftime("%Y-%m-%d %H:%M:%S")
    # 打开文件, 并读取原有内容
    try:
        with open("log.txt", "r") as file:
            contents = file.read()
    except FileNotFoundError:
        contents = ""
    # 将新的日志内容添加在原有内容之前
    log_in = re.sub(r"\033\[[0-9;]+m", "", log)
    log_in = re.sub(r"\n", "", log_in)
    new_contents = f"{formatted_time} {log_in}\n{contents}"
    # 将新的日志内容写入文件
    file_save(new_contents, "log.txt")
    if display:
        formatted_time_mini = current_time.strftime("%H:%M:%S")
        log_print = f"{formatted_time_mini}|{log}" if time_display else f"{log}"
        log_print = f"{log_print}|{suffix}" if suffix else f"{log_print}"
        print(log_print)

#网址二维码模块
def qr_code(data):
    # 创建一个QRCode对象
    qr = qrcode.QRCode(version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=1, border=0)
    # 设置二维码的数据
    qr.add_data(data)
    # 获取QR Code矩阵
    qr.make(fit=True)
    matrix = qr.make_image(fill_color="black", back_color="white").modules
    # 获取图像的宽度和高度
    width, height = len(matrix), len(matrix)
    height_double = math.ceil(height/2)
    # 转换图像为ASCII字符
    fonts = ["▀", "▄", "█", " "]
    ascii_art = ""
    for y in range(height_double):
        if (y+1)*2-1 >= height:
            for x in range(width):
                ascii_art += fonts[0] if matrix[(y+1)*2-2][x] is True else fonts[3]
        else:
            for x in range(width):
                if matrix[(y+1)*2-2][x] is True and matrix[(y+1)*2-1][x] is True:
                    ascii_art += fonts[2]
                elif matrix[(y+1)*2-2][x] is True and matrix[(y+1)*2-1][x] is False:
                    ascii_art += fonts[0]
                elif matrix[(y+1)*2-2][x] is False and matrix[(y+1)*2-1][x] is True:
                    ascii_art += fonts[1]
                else:
                    ascii_art += " "
            ascii_art += "\n"
    print(ascii_art)

# 申请bilibili二维码并获取token和URL模块
def request_qr_code():
    # 实际申请二维码的API请求
    response = requests.get('https://passport.bilibili.com/x/passport-login/web/qrcode/generate', timeout = 5)
    data = response.json()
    return data['data']['qrcode_key'], data['data']['url']

# 扫码登录bilibili并返回状态和cookie模块
def scan_login(token):
    # 创建一个MozillaCookieJar对象，指定保存文件
    cookie_jar = http.cookiejar.MozillaCookieJar("yt_dlp_bilibili.txt")
    # 创建一个Session对象
    session = requests.Session()
    # 将CookieJar对象绑定到Session对象
    session.cookies = cookie_jar
    # 发送GET请求
    response = session.get(f'https://passport.bilibili.com/x/passport-login/web/qrcode/poll?qrcode_key={token}', timeout = 5)
    # 保存 cookies 到文件
    cookie_jar.save()
    # 加载 cookies
    cookie_jar.load()
    data = response.json()
    cookies = response.cookies
    return data['data']['code'], cookies, data['data']['refresh_token']

# 登陆bilibili模块
def bilibili_login():
    token, url = request_qr_code()
    print(f"{datetime.now().strftime('%H:%M:%S')}|请用Bilibili App扫描登录:")
    qr_code(url)
    login_status_change = ""
    time_print = f"{datetime.now().strftime('%H:%M:%S')}|Bilibili "
    while True:
        status, cookie, refresh_token = scan_login(token)
        if status == 86101:
            continue
        elif status == 86038:
            login_status = '\033[31m二维码失效超时, 请重新运行\033[0m'
        elif status == 86090:
            login_status = '\033[32m扫描成功\033[0m'
        elif status == 0:
            login_status = '\033[32m登陆成功\033[0m'
        if login_status_change != login_status:
            if login_status == '':
                print(f"{time_print}{login_status}", end = "")
            else:
                print(f"\r{time_print}{login_status}", end = "")
        login_status_change = login_status
        if status == 86038:
            print("")
            return login_status, refresh_token
        elif status == 0:
            print("")
            return cookie, refresh_token
        time.sleep(1)

# 保存bilibili登陆成功后的cookies模块
def save_bilibili_cookies():
    bilibili_cookie, refresh_token = bilibili_login()
    if bilibili_cookie == '\033[31m二维码失效超时, 请重新运行\033[0m':
        write_log(f"Bilibili \033[31m登陆失败\033[0m")
        sys.exit(0)
    else:
        
        bilibili_cookie = requests.utils.dict_from_cookiejar(bilibili_cookie)
        bilibili_cookie["buvid3"] = requests.get('https://api.bilibili.com/x/frontend/finger/spi', timeout = 5).json()["data"]["b_3"]
        bilibili_cookie["refresh_token"] = refresh_token
        file_save(json.dumps(bilibili_cookie, ensure_ascii=False), "bilibili_cookies.txt")

save_bilibili_cookies()

In [None]:
from functools import reduce
from hashlib import md5
import urllib.parse
import time
import requests
import json

mixinKeyEncTab = [
    46, 47, 18, 2, 53, 8, 23, 32, 15, 50, 10, 31, 58, 3, 45, 35, 27, 43, 5, 49,
    33, 9, 42, 19, 29, 28, 14, 39, 12, 38, 41, 13, 37, 48, 7, 16, 24, 55, 40,
    61, 26, 17, 0, 1, 60, 51, 30, 4, 22, 25, 54, 21, 56, 59, 6, 63, 57, 62, 11,
    36, 20, 34, 44, 52
]

def getMixinKey(orig: str):
    '对 imgKey 和 subKey 进行字符顺序打乱编码'
    return reduce(lambda s, i: s + orig[i], mixinKeyEncTab, '')[:32]

def encWbi(params: dict, img_key: str, sub_key: str):
    '为请求参数进行 wbi 签名'
    mixin_key = getMixinKey(img_key + sub_key)
    curr_time = round(time.time())
    params['wts'] = curr_time                                   # 添加 wts 字段
    params = dict(sorted(params.items()))                       # 按照 key 重排参数
    # 过滤 value 中的 "!'()*" 字符
    params = {
        k : ''.join(filter(lambda chr: chr not in "!'()*", str(v)))
        for k, v 
        in params.items()
    }
    query = urllib.parse.urlencode(params)                      # 序列化参数
    wbi_sign = md5((query + mixin_key).encode()).hexdigest()    # 计算 w_rid
    params['w_rid'] = wbi_sign
    return params

def getWbiKeys() -> tuple[str, str]:
    '获取最新的 img_key 和 sub_key'
    resp = requests.get('https://api.bilibili.com/x/web-interface/nav')
    resp.raise_for_status()
    json_content = resp.json()
    img_url: str = json_content['data']['wbi_img']['img_url']
    sub_url: str = json_content['data']['wbi_img']['sub_url']
    img_key = img_url.rsplit('/', 1)[1].split('.')[0]
    sub_key = sub_url.rsplit('/', 1)[1].split('.')[0]
    return img_key, sub_key

img_key, sub_key = getWbiKeys()

signed_params = encWbi(
    params={
        'mid': '326499679'
    },
    img_key=img_key,
    sub_key=sub_key
)
query = urllib.parse.urlencode(signed_params)

with open('bilibili_cookies.txt', 'r') as file:
    cookies = file.read()
cookies = json.loads(cookies)

user_agent = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36'}
url = f"https://api.bilibili.com/x/space/wbi/arc/search"
response = requests.get(f"{url}?{query}", headers = user_agent, cookies = cookies)
response = response.json()
print(response)

In [None]:
# 检查是否需要刷新
user_agent = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',}
url = f"https://passport.bilibili.com/x/passport-login/web/cookie/info"
response = requests.get(f"{url}", headers = user_agent, cookies = cookies)
response = response.json()
print(response)

In [None]:
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from Crypto.Hash import SHA256
import binascii
import time
import re

key = RSA.importKey('''\
-----BEGIN PUBLIC KEY-----
MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDLgd2OAkcGVtoE3ThUREbio0Eg
Uc/prcajMKXvkCKFCWhJYJcLkcM2DKKcSeFpD/j6Boy538YXnR6VhcuUJOhH2x71
nzPjfdTcqMz7djHum0qSZA0AyCBDABUqCrfNgCiJ00Ra7GmRj+YCK1NJEuewlb40
JNrRuoEUXpabUzGB8QIDAQAB
-----END PUBLIC KEY-----''')

def getCorrespondPath(ts):
    cipher = PKCS1_OAEP.new(key, SHA256)
    encrypted = cipher.encrypt(f'refresh_{ts}'.encode())
    return binascii.b2a_hex(encrypted).decode()

ts = round(time.time() * 1000)
user_agent = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36',}
url = f"https://www.bilibili.com/correspond/1/{getCorrespondPath(ts)}"
response = requests.get(f"{url}", headers = user_agent, cookies = cookies)
match = re.search(r'<div id="1-name">(.+?)</div>', response.text)
if match:
    value = match.group(1)
    print(value)

In [None]:
print(cookies["bili_jct"])
print(cookies["refresh_token"])

In [None]:
url = 'https://passport.bilibili.com/x/passport-login/web/cookie/refresh'
data = {
    'csrf': cookies["bili_jct"],
    'refresh_csrf': value,
    'source': 'main_web',
    'refresh_token': cookies["refresh_token"]
}
response = requests.post(url, data=data, cookies=cookies)

In [None]:
print(response.text)
print(requests.utils.dict_from_cookiejar(response.cookies))

In [None]:
url = 'https://passport.bilibili.com/x/passport-login/web/confirm/refresh'
data = {
    'csrf': '1b862244c1e62b35d1c481b8a192b63f',
    'refresh_token': 'bb2777ea0209ab4667f0ba117774d7b1'
}
cookies = {
    'SESSDATA': 'b9e0d77c%2C1716267889%2Cc1fb7%2Ab2CjCnkYsUSv3RwQesH9ptzhltPmkjR3EYge9TjMO6V2xdmQya5azPaoUBg187Slstd-ASVnhRS3VBSThWcFlNNFZ5NUdrdVBqeHBETXRzdnhGMkFXNHNLcE5vTnVXMVgyRndQWHJUdnVEb3hFQ3RlTklXRXRzWUI5RE9kOWx1NENkZWlGVXZ3Y2FRIIEC'
}
response_1 = requests.post(url, data=data, cookies=cookies)
print(response_1.text)

In [None]:
import requests

# 定义请求头中的 User-Agent
user_agent = {
    'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/116.0.0.0 Safari/537.36'
}
#user_agent = {'User-Agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36'}
url = "https://api.bilibili.com/x/space/wbi/arc/search?mid=326499679&wts=1699479437&w_rid=8c044bbe24de21e1fdc190d284d362d0"
response = requests.get(f"{url}", headers = user_agent, timeout = 5)
response = response.json()
print(response)

In [None]:
response = requests.get('https://api.bilibili.com/x/frontend/finger/spi', timeout = 5)
print(requests.get('https://api.bilibili.com/x/frontend/finger/spi', timeout = 5).json()["data"]["b_3"])

In [None]:
# HTTP GET请求重试模块
def http_get(url, name, max_retries=10, retry_delay=6, headers_possess=False):
    user_agent = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
    }
    err = None  # 初始化 err 变量
    response = None  # 初始化 response 变量
    for num in range(max_retries):
        try:
            if headers_possess:
                response = requests.get(url, headers=user_agent, timeout=5)
            else:
                response = requests.get(url, timeout=5)
            response.raise_for_status()
        except Exception as e:
            if response is not None and response.status_code in {404}:
                return response
            print(
                f"{datetime.now().strftime('%H:%M:%S')}|{name}|\033[31m连接异常重试中...\033[97m{num + 1}\033[0m"
            )
            if err:
                err = f":\n{str(e)}"
            else:
                err = ""
        else:
            return response
        time.sleep(retry_delay)
    print(
        f"{datetime.now().strftime('%H:%M:%S')}|{name}|\033[31m达到最大重试次数\033[97m{max_retries}\033[0m{err}"
    )
    return response

In [None]:
import requests
from datetime import datetime
import time

# HTTP GET请求重试模块
def http_get(url, name, max_retries=10, retry_delay=6, headers_possess=False, cookies=None, data=None, cookie_jar_name=None):
    user_agent = {
        "User-Agent": "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/118.0.0.0 Safari/537.36"
    }
    err = None  # 初始化 err 变量
    response = None  # 初始化 response 变量
    # 创建一个Session对象
    session = requests.Session()
    if cookie_jar_name:
        # 创建一个MozillaCookieJar对象，指定保存文件
        cookie_jar = http.cookiejar.MozillaCookieJar(f"{cookie_jar_name}.txt")
        # 将CookieJar对象绑定到Session对象
        session.cookies = cookie_jar
    if headers_possess:
        session.headers.update(user_agent)
    if cookies:
        session.cookies.update(cookies)
    if data:
        session.params.update(data)
    for num in range(max_retries):
        try:
            response = session.get(url, timeout=5)
            response.raise_for_status()
        except Exception as e:
            if response is not None and response.status_code in {404}:
                return response
            print(
                f"{datetime.now().strftime('%H:%M:%S')}|{name}|\033[31m连接异常重试中...\033[97m{num + 1}\033[0m"
            )
            if err:
                err = f":\n{str(e)}"
            else:
                err = ""
        else:
            return response
        time.sleep(retry_delay)
    print(
        f"{datetime.now().strftime('%H:%M:%S')}|{name}|\033[31m达到最大重试次数\033[97m{max_retries}\033[0m{err}"
    )
    return response

In [6]:
import zipfile
from datetime import datetime, timedelta, timezone  

# xml备份保存模块
def backup_zip_save(file_content):
    def file_name():
        # 获取当前的具体时间
        current_time = datetime.now()
        # 格式化输出, 只保留年月日时分秒
        formatted_time = current_time.strftime("%Y%m%d%H%M%S")
        return f"{formatted_time}.xml"
    # 定义要添加到压缩包中的文件名和内容
    compress_file_name = "backup_xml.zip"
    save_success = False

    while save_success == False:
        file_name = file_name()
    # 打开现有的压缩包并添加文件
        with zipfile.ZipFile(compress_file_name, 'a') as zipf:
            if file_name not in zipf.namelist():
                zipf.writestr(file_name, file_content)
                save_success = True
            else:
                print(f"文件 '{file_name}' 已存在于压缩包中")



In [3]:
from datetime import datetime, timedelta, timezone  

# 获取当前的具体时间
current_time = datetime.now()
# 格式化输出, 只保留年月日时分秒
formatted_time = current_time.strftime("%Y%m%d%H%M%S")


print(formatted_time)

20240220022637


In [1]:
import re


with open(f"YouTube.xml", "r", encoding="utf-8") as file:  # 打开文件进行读取
    rss_original = file.read()



pattern_youtube_fail_item = r'<!-- UCKGlVUb95XTrCB8pNh2IUYw -->(?:(?!<!-- UCKGlVUb95XTrCB8pNh2IUYw -->).)+?<guid>Wg2PDnBFQHY</guid>.+?<!-- UCKGlVUb95XTrCB8pNh2IUYw -->'
    
overall_rss = re.findall(pattern_youtube_fail_item,rss_original, flags=re.DOTALL)

print(overall_rss[0])

<!-- UCKGlVUb95XTrCB8pNh2IUYw -->
        <item>
            <guid>Wg2PDnBFQHY</guid>
            <title>23歲的美國視頻直播主 憑藉著奇葩的三觀和滿滿的惡意 成為了所有日本人眼中“最討厭的人”</title>
            <link>https://youtube.com/watch?v=Wg2PDnBFQHY</link>
            <description>『謎案追蹤』</description>
            <pubDate>Mon, 02 Oct 2023 19:40:21 +0800</pubDate>
            <enclosure url="http://127.0.0.1:8000/channel_audiovisual/UCKGlVUb95XTrCB8pNh2IUYw/Wg2PDnBFQHY.m4a" length="28321265" type="audio/x-m4a"></enclosure>
            <itunes:author>23歲的美國視頻直播主 憑藉著奇葩的三觀和滿滿的惡意 成為了所有日本人眼中“最討厭的人”</itunes:author>
            <itunes:subtitle>23歲的美國視頻直播主 憑藉著奇葩的三觀和滿滿的惡意 成為了所有日本人眼中“最討厭的人”</itunes:subtitle>
            <itunes:summary><![CDATA[『謎案追蹤』]]></itunes:summary>
            <itunes:image href="https://i4.ytimg.com/vi/Wg2PDnBFQHY/hqdefault.jpg"></itunes:image>
            <itunes:duration>29:11</itunes:duration>
            <itunes:explicit>no</itunes:explicit>
            <itunes:order>1</itunes:order>
        </item>
<