#### 自动打开网页

In [2]:
import webbrowser
webbrowser.open('https://www.google.com/maps/place/your_address_string')


True

### requests.get() 下载网页

In [9]:
import requests

# 发送请求并获取响应
res = requests.get('https://automatetheboringstuff.com/files/rj.txt')

# 检查请求状态
if res.status_code == requests.codes.ok:
    print("Download successful!")
    print("Length of downloaded text:", len(res.text))
    print("First 250 characters of the text:\n", res.text[:250])
else:
    print("Failed to download the page.")


Download successful!
Length of downloaded text: 178978
First 250 characters of the text:
 The Project Gutenberg EBook of Romeo and Juliet, by William Shakespeare

This eBook is for the use of anyone anywhere at no cost and with
almost no restrictions whatsoever.  You may copy it, give it away or
re-use it under the terms of the Projec


### 错误检查

import requests

res = requests.get('http://inventwithpython.com/page_that_does_not_exist')

# 检查请求是否成功
try:
    res.raise_for_status()
except Exception as exc:
    print('There was a problem: %s' % (exc))


In [127]:
import requests

def request_get():
    url = "https://jsonplaceholder.typicode.com/posts"
    response = requests.get(url)
    if response.status_code == 200:
        print("GET 请求成功:", response.json())
    else:
        print("GET 请求失败:", response.status_code, response.text)

request_get()

GET 请求成功: [{'userId': 1, 'id': 1, 'title': 'sunt aut facere repellat provident occaecati excepturi optio reprehenderit', 'body': 'quia et suscipit\nsuscipit recusandae consequuntur expedita et cum\nreprehenderit molestiae ut ut quas totam\nnostrum rerum est autem sunt rem eveniet architecto'}, {'userId': 1, 'id': 2, 'title': 'qui est esse', 'body': 'est rerum tempore vitae\nsequi sint nihil reprehenderit dolor beatae ea dolores neque\nfugiat blanditiis voluptate porro vel nihil molestiae ut reiciendis\nqui aperiam non debitis possimus qui neque nisi nulla'}, {'userId': 1, 'id': 3, 'title': 'ea molestias quasi exercitationem repellat qui ipsa sit aut', 'body': 'et iusto sed quo iure\nvoluptatem occaecati omnis eligendi aut ad\nvoluptatem doloribus vel accusantium quis pariatur\nmolestiae porro eius odio et labore et velit aut'}, {'userId': 1, 'id': 4, 'title': 'eum et est occaecati', 'body': 'ullam et saepe reiciendis voluptatem adipisci\nsit amet autem assumenda provident rerum culpa\n

In [136]:
import requests

url = "https://jsonplaceholder.typicode.com/posts"

def request_post():
    
    headers = {
        'Content-type': 'application/json; charset=UTF-8'
    }
    data = {
        "title": "foo",
        "body": "bar",
        "userId": 1
    }
    response = requests.post(url, json=data, headers=headers)
    if response.status_code == 201:
        print("POST 请求成功:", response.json())
    else:
        print("POST 请求失败:", response.status_code, response.text)

request_post()


POST 请求成功: {'title': 'foo', 'body': 'bar', 'userId': 1, 'id': 101}


In [139]:
# PUT 请求
import requests

url = "https://jsonplaceholder.typicode.com/posts"

def request_put():
   
    update_url = f"{url}/1"  # 更新 ID 为 1 的数据
    data = {
        "id": 1,
        "title": "updated title",
        "body": "updated body",
        "userId": 1
    }
    response = requests.put(update_url, json=data)
    print("Response Text:", response.text)
    if response.status_code == 200:
        print("PUT 请求成功:", response.json())
    else:
        print("PUT 请求失败:", response.status_code)
        
request_put()

Response Text: {
  "id": 1,
  "title": "updated title",
  "body": "updated body",
  "userId": 1
}
PUT 请求成功: {'id': 1, 'title': 'updated title', 'body': 'updated body', 'userId': 1}


In [141]:
# DELETE 请求
import requests

url = "https://jsonplaceholder.typicode.com/posts"

def request_delete():
    delete_url = f"{url}/1"  # 删除 ID 为 1 的数据
    response = requests.delete(delete_url)
    print("Response Text:", response.text)
    if response.status_code == 200:
        print("DELETE 请求成功:", response.json())
    else:
        print("DELETE 请求失败:", response.status_code)
        
request_delete()

Response Text: {}
DELETE 请求成功: {}


### 下载文件


In [10]:
#  写入二进制数据
import requests

# 下载网页内容
res = requests.get('https://automatetheboringstuff.com/files/rj.txt')
res.raise_for_status()

# 以写二进制模式打开文件
playFile = open('RomeoAndJuliet.txt', 'wb')

# 使用 iter_content() 方法分块写入文件
for chunk in res.iter_content(100000):
    playFile.write(chunk)

playFile.close()


ProxyError: HTTPSConnectionPool(host='automatetheboringstuff.com', port=443): Max retries exceeded with url: /files/rj.txt (Caused by ProxyError('Unable to connect to proxy', SSLError(SSLZeroReturnError(6, 'TLS/SSL connection has been closed (EOF) (_ssl.c:1149)'))))

## HTML

<!DOCTYPE html>
<html lang="en">
<head>
    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1.0" />
    <title>My First Web Page with JavaScript</title>
    <meta name="apple-mobile-web-app-title" content="My Web Page">
    <meta name='robots' content='max-image-preview:large' />
    <link rel="canonical" href="https://www.example.com" />
    <link rel="stylesheet" href="https://lf3-cdn-tos.bytecdntp.com/cdn/expire-1-M/font-awesome/4.7.0/css/font-awesome.min.css" media="all" />
    <script src="https://lf3-cdn-tos.bytecdntp.com/cdn/expire-1-M/jquery/2.0.3/jquery.min.js"></script>
    <link rel="apple-touch-icon" href="https://static.jyshare.com/images/icon/mobile-icon.png" />
</head>
<body>

    <!-- 页面内容 -->
    <h1>This is my first web page with JavaScript!</h1>
    <p>Click the button below to change the text of the heading.</p>
    <button onclick="changeHeadingText()">Click Me</button>

    <!-- 内联 JavaScript -->
    <script>
        // 当点击按钮时，修改标题文本
        function changeHeadingText() {
            document.querySelector('h1').textContent = 'Hello, JavaScript is working!';
        }

        // 另一个 JavaScript 函数：改变页面背景色
        function changeBackgroundColor() {
            document.body.style.backgroundColor = 'lightblue';
        }

        // 页面加载后改变背景色
        window.onload = function() {
            changeBackgroundColor();
        };
    </script>

    <!-- 引入外部 JavaScript 文件 -->
    <script src="script.js"></script>

</body>
</html>


### 使用 BeautifulSoup 抓取数据

In [21]:
import requests
from bs4 import BeautifulSoup

# 请求天气页面
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
url = 'http://weather.gov/'
response = requests.get(url,headers=headers)
soup = BeautifulSoup(response.text, 'html.parser')

# 使用类名查找温度信息
temperature = soup.find('p', class_='myforecast-current-lrg').text
print(temperature)


ProxyError: HTTPSConnectionPool(host='weather.gov', port=443): Max retries exceeded with url: / (Caused by ProxyError('Unable to connect to proxy', SSLError(SSLZeroReturnError(6, 'TLS/SSL connection has been closed (EOF) (_ssl.c:1149)'))))

In [23]:
import requests, bs4

# 使用 requests 获取页面内容
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}
res = requests.get('http://nostarch.com',headers=headers)
res.raise_for_status()
noStarchSoup = bs4.BeautifulSoup(res.text, 'html.parser')  # 创建 BeautifulSoup 对象
print(type(noStarchSoup))  # <class 'bs4.BeautifulSoup'>


HTTPError: 403 Client Error: Forbidden for url: https://nostarch.com/

### 使用 select() 方法查找元素
select() 方法允许你使用 CSS 选择器来选择 HTML 元素。它返回一个包含所有匹配元素的列表。
| 选择器                  | 匹配内容                |
|-------------------------|-------------------------|
| soup.select('div')       | 所有 `<div>` 元素        |
| soup.select('#author')   | id 为 author 的元素      |
| soup.select('.notice')   | 类名为 notice 的元素     |
| soup.select('div > span')| `<div>` 元素下的直接 `<span>` 元素 |


In [None]:
elems = noStarchSoup.select('#author')
print(len(elems))  # 输出匹配到的元素数量
print(type(elems[0]))  # <class 'bs4.element.Tag'>
print(elems[0].getText())  # 获取元素的文本内容：'Al Sweigart'
print(str(elems[0]))  # 输出完整的 HTML：'<span id="author">Al Sweigart</span>'
print(elems[0].attrs)  # 输出元素的属性：{'id': 'author'}


In [24]:
import bs4

# 解析 example.html 文件
soup = bs4.BeautifulSoup(open('example.html'))

# 获取第一个 <span> 元素
spanElem = soup.select('span')[0]

# 打印 <span> 元素的字符串表示
print(str(spanElem))  # '<span id="author">Al Sweigart</span>'

# 获取 'id' 属性值
print(spanElem.get('id'))  # 'author'

# 尝试获取不存在的属性，返回 None
print(spanElem.get('some_nonexistent_attr') == None)  # True

# 获取元素的所有属性
print(spanElem.attrs)  # {'id': 'author'}


FileNotFoundError: [Errno 2] No such file or directory: 'example.html'

In [21]:

import requests
import re
import json
from bs4 import BeautifulSoup

# 模拟获取抖音视频 URL，以下是一个示例链接
# video_url = 'https://www.douyin.com'  # 替换为实际的抖音视频链接
video_url = 'https://v.qq.com/'

# 设置请求头
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
}

# 获取视频页面的 HTML
response = requests.get(video_url, headers=headers)
html_content = response.text
soup = BeautifulSoup(html_content, 'html.parser')
#print(soup.prettify())

divs = soup.find_all("div", class_="title")


print(divs)

texts = [div.text.strip() for div in divs]

print(texts)

#match = re.search(r'window\.__INIT_DATA__ = ({.*?});', html_content)
# if match:
#  data = json.loads(match.group(1))
#   print(data)

[<div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 西北岁月 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 尊享黑金战舰 登录可得 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 永夜星河 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 令人心动的offer 第6季 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> TapTap </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 剑来 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 好团圆 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 双城之战2·中文主题曲 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 锦绣安宁 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;"> 现在就出发2 </div>, <div class="title" data-acttype="1003" data-v-be042853="" style="display:;

### 获取抖音分享视频的详细信息

In [67]:
import requests
import re
import json
from bs4 import BeautifulSoup

HEADERS = {
    'accept-encoding': 'gzip, deflate, br',
    'accept-language': 'zh-CN,zh;q=0.9',
    'pragma': 'no-cache',
    'cache-control': 'no-cache',
    'upgrade-insecure-requests': '1',
    'user-agent': "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372   Safari/604.1",
}

def get_real_address(url):
    if url.find('v.douyin.com') < 0:
        return url
    res = requests.get(url, headers=HEADERS, allow_redirects=False)
    if res.status_code == 302:
        long_url = res.headers['Location']
        HEADERS['Referer'] = long_url
        return long_url
    return None
    
url      = get_real_address('https://v.douyin.com/iA2XSLA5/')
print(url)
response = requests.get(url,headers=HEADERS)
html_content = response.text
soup = BeautifulSoup(html_content, 'html')
print(soup.prettify())

# 改进的正则表达式，去掉了 ';'
pattern = r'window\._ROUTER_DATA\s*=\s*(\{.*\})\s*;?'

# 尝试匹配数据
match = re.search(pattern, html_content)
if match:
    # 提取 JSON 字符串（大括号中的内容）
    router_data_str = match.group(1)  # 提取大括号中的内容
    print("提取到的 JSON 数据:", router_data_str)
    # 使用 json.loads() 解析 JSON 数据
    try:
        router_data = json.loads(router_data_str)
        print("解析后的 Python 字典:", router_data)
    except json.JSONDecodeError as e:
        print("解析 JSON 时出错:", e)
else:
    print("没有找到 window._ROUTER_DATA 数据")


https://www.iesdouyin.com/share/video/7427027349822016805/?region=CN&mid=7427027693159385866&u_code=0&did=MS4wLjABAAAA_3zUtK5KVDiCrVRUL29i2Wq9vGP0t80ov17h1pwjZXgarcqqw47Wnhvg2rxqdSbx&iid=MS4wLjABAAAANwkJuWIRFOzg5uCpDRpMj4OX-QryoDgn-yYlXQnRwQQ&with_sec_did=1&titleType=title&share_sign=r_vAcILc7cf08OJN_uUGtC7nxzRz0qM2zZgvKgdXtPo-&share_version=170400&ts=1730971519&from_aid=6383&from_ssr=1&from=web_code_link
<!DOCTYPE html>
<html>
 <head>
  <script nonce="argus-csp-token">
   var e=function(e,t,a){if(Math.ceil(100*Math.random())<=100*t){var o="1243",d="16720",n={ev_type:"batch",list:[{ev_type:"custom",payload:{name:"sdk_glue_load",type:"event",metrics:{},categories:{sdk_glue_load_status:e,sdk_glue_load_err_src:a,payload_bdms_aid:o,payload_bdms_page_id:d}},common:{context:{ctx_bdms_aid:o,ctx_bdms_page_id:d,ctx_aqa:1},bid:"web_bdms_cn",pid:window.location.pathname,view_id:"/_1",user_id:"",session_id:"0-a-1-2-c",release:"",env:"production",url:window.location.href,timestamp:+new Date,sdk_ver

### BeautifulSoup获取抖音分享视频的详细信息

In [90]:
import requests
import os
import sys
import copy
import urllib.parse
import urllib.request
from urllib.parse import urlencode
from bs4 import BeautifulSoup
import json


TIMEOUT = 10

RETRY = 5

# 请求网页

HEADERS = {
    'accept-encoding': 'gzip, deflate, br',
    'accept-language': 'zh-CN,zh;q=0.9',
    'pragma': 'no-cache',
    'cache-control': 'no-cache',
    'upgrade-insecure-requests': '1',
    'user-agent': "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372   Safari/604.1",
}

def getRemoteFileSize(url, proxy=None):
    #'''
    # 通过content-length头获取远程文件大小
    # '''
    try:
        request = urllib.request.Request(url)
        request.get_method = lambda: 'HEAD'
        response = urllib.request.urlopen(request)
        response.read()
    except urllib.error.HTTPError as e:
        # 远程文件不存在
        print(e.code)
        print(e.read().decode("utf8"))
        return 0
    else:
        fileSize = dict(response.headers).get('Content-Length', 0)
        return int(fileSize)
        
def download(uri, description, target_folder):
    headers = copy.deepcopy(HEADERS)
    file_name = description + '.mp4'
    headers['user-agent'] = 'Aweme/63013 CFNetwork/978.0.7 Darwin/18.6.0'
    file_path = os.path.join(target_folder, file_name)

    # 检查文件是否已经存在并与远程文件大小相同
    if os.path.isfile(file_path):
        remoteSize = getRemoteFileSize(uri)
        localSize = os.path.getsize(file_path)
        print(f"Remote size: {remoteSize}, Local size: {localSize}")
        if remoteSize == localSize:
            return  # 文件已经存在且大小相同，不需要重新下载

    print(f"Downloading {file_name} from {uri}\n")
    retry_times = 0
    while retry_times < RETRY:
        try:
            resp = requests.get(uri, headers=headers, stream=True, timeout=TIMEOUT)
            if resp.status_code == 403:
                print(f"Access Denied when retrieving {uri}")
                raise Exception("Access Denied")

            # 开始下载文件
            with open(file_path, 'wb') as fh:
                for chunk in resp.iter_content(chunk_size=1024):
                    fh.write(chunk)
            print(f"Download completed for {file_name}\n")
            break  # 下载成功，跳出循环
        except requests.exceptions.RequestException as e:
            print(f"Attempt {retry_times + 1} failed with error: {e}")
        retry_times += 1
        time.sleep(1)  # 等待1秒再重试
    else:
        # 所有尝试均失败，删除文件（如果已部分下载）
        try:
            os.remove(file_path)
        except OSError:
            pass
        print(f"Failed to retrieve {uri} after {RETRY} attempts.\n")
    
def get_real_address(url):
    if url.find('v.douyin.com') < 0:
        return url
    res = requests.get(url, headers=HEADERS, allow_redirects=False)
    if res.status_code == 302:
        long_url = res.headers['Location']
        HEADERS['Referer'] = long_url
        return long_url
    return None
    
def fetch_dounyin_web():
    url      = get_real_address('https://v.douyin.com/iA2XSLA5/')
    print(url)
    response = requests.get(url,headers=HEADERS)
    html_content = response.text
    # 解析 HTML
    soup = BeautifulSoup(html_content, 'html.parser')
    # print(soup.prettify())
    
    # 查找包含 window._ROUTER_DATA 的 <script> 标签
    script_tag = soup.find('script', string=lambda t: t and 'window._ROUTER_DATA' in t)
    
    # print(script_tag)
    
    if script_tag:
        # 提取 window._ROUTER_DATA 的内容
        start = script_tag.string.find('window._ROUTER_DATA = ') + len('window._ROUTER_DATA = ')
        end = script_tag.string.find('</script>', start)
        router_data_str = script_tag.string[start:end].strip()
         # 如果缺少 '}'，手动加上
        if not router_data_str.endswith('}'):
            router_data_str += '}'
        print("window._ROUTER_DATA:",router_data_str)
         # 清理非JSON字符（如单引号替换为双引号）
        router_data_str = router_data_str.replace("'", '"')
        
        try:
            # 将字符串解析为 Python 对象
            router_data = json.loads(router_data_str)
    
            # 打印获取到的数据
            print("Parsed JSON:", router_data)
            
            video_data = router_data["loaderData"]["video_(id)/page"]["videoInfoRes"]["item_list"][0]
            video_url = video_data["video"]["play_addr"]["url_list"][0]
            description = video_data["desc"]
            author_name = video_data["author"]["nickname"]
            comment_count = video_data["statistics"]["comment_count"]
            like_count = video_data["statistics"]["digg_count"]
            share_count = video_data["statistics"]["share_count"]
            collect_count = video_data["statistics"]["collect_count"]
            # 打印提取结果
            print("视频 URL:", video_url)
            print("描述:", description)
            print("作者昵称:", author_name)
            print("评论数:", comment_count)
            print("点赞数:", like_count)
            print("分享数:", share_count)
            print("收藏数:", collect_count)
            download(video_url,description,"C:\\Users\\JYC11\\Desktop\\Lecture\\download")
        
        except json.JSONDecodeError as e:
            print(f"JSONDecodeError: {e}")
            print(f"Error at position {e.pos}: {router_data_str[e.pos-100:e.pos+100]}")  # 输出错误位置附近的字符

fetch_dounyin_web()

https://www.iesdouyin.com/share/video/7427027349822016805/?region=CN&mid=7427027693159385866&u_code=0&did=MS4wLjABAAAA_3zUtK5KVDiCrVRUL29i2Wq9vGP0t80ov17h1pwjZXgarcqqw47Wnhvg2rxqdSbx&iid=MS4wLjABAAAANwkJuWIRFOzg5uCpDRpMj4OX-QryoDgn-yYlXQnRwQQ&with_sec_did=1&titleType=title&share_sign=r_vAcILc7cf08OJN_uUGtC7nxzRz0qM2zZgvKgdXtPo-&share_version=170400&ts=1730971519&from_aid=6383&from_ssr=1&from=web_code_link
window._ROUTER_DATA: {"loaderData":{"video_layout":null,"video_(id)\u002Fpage":{"ua":"Mozilla\u002F5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit\u002F604.1.38 (KHTML, like Gecko) Version\u002F11.0 Mobile\u002F15A372   Safari\u002F604.1","isSpider":false,"webId":"7434815123599279628","query":{"region":"CN","mid":"7427027693159385866","u_code":"0","did":"MS4wLjABAAAA_3zUtK5KVDiCrVRUL29i2Wq9vGP0t80ov17h1pwjZXgarcqqw47Wnhvg2rxqdSbx","iid":"MS4wLjABAAAANwkJuWIRFOzg5uCpDRpMj4OX-QryoDgn-yYlXQnRwQQ","with_sec_did":"1","titleType":"title","share_sign":"r_vAcILc7cf08OJN_uUGtC7nxzRz

### 使用 asyncio 和 aiohttp 提高爬虫的并发性能

In [5]:
import nest_asyncio
import asyncio
import aiohttp

nest_asyncio.apply()  # 使 asyncio.run() 可以在已运行事件循环中执行

async def fetch(url, session):
    try:
        async with session.get(url) as response:
            print(f"URL: {url} - 状态码: {response.status}")
            return await response.text()
    except Exception as e:
        print(f"抓取网页 {url} 时出错: {e}")
        return None

async def main(urls):
    """创建一个 session 并发地获取多个 URL"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(url, session) for url in urls]
         # 并发运行任务
        results = await asyncio.gather(*tasks)
        for result in results:
            print(result)

urls = [
    "https://example.com",
    "https://httpbin.org/get",
    "https://httpbin.org/delay/2",
    "https://jsonplaceholder.typicode.com/posts"
]

# 启动异步事件循环
asyncio.run(main(urls))


URL: https://jsonplaceholder.typicode.com/posts - 状态码: 200
URL: https://example.com - 状态码: 200
URL: https://httpbin.org/get - 状态码: 200
URL: https://httpbin.org/delay/2 - 状态码: 200
<!doctype html>
<html>
<head>
    <title>Example Domain</title>

    <meta charset="utf-8" />
    <meta http-equiv="Content-type" content="text/html; charset=utf-8" />
    <meta name="viewport" content="width=device-width, initial-scale=1" />
    <style type="text/css">
    body {
        background-color: #f0f0f2;
        margin: 0;
        padding: 0;
        font-family: -apple-system, system-ui, BlinkMacSystemFont, "Segoe UI", "Open Sans", "Helvetica Neue", Helvetica, Arial, sans-serif;
        
    }
    div {
        width: 600px;
        margin: 5em auto;
        padding: 2em;
        background-color: #fdfdff;
        border-radius: 0.5em;
        box-shadow: 2px 3px 7px 2px rgba(0,0,0,0.02);
    }
    a:link, a:visited {
        color: #38488f;
        text-decoration: none;
    }
    @media (max-widt

### 多线程

In [8]:
import threading
import time

def work(task_name, delay):
    print(f"{task_name} 开始，等待 {delay} 秒...")
    time.sleep(delay)
    print(f"{task_name} 完成")

# 定义多个线程
thread1 = threading.Thread(target=work, args=("线程 1", 2))
thread2 = threading.Thread(target=work, args=("线程 2", 3))
thread3 = threading.Thread(target=work, args=("线程 3", 1))

# 启动线程
thread1.start()
thread2.start()
thread3.start()

# 等待所有线程完成
thread1.join()
thread2.join()
thread3.join()

线程 1 开始，等待 2 秒...
线程 2 开始，等待 3 秒...
线程 3 开始，等待 1 秒...
线程 3 完成
线程 1 完成
线程 2 完成


### 继承Thread 类

In [12]:
import time
from threading import Thread

class Worker(Thread):
    def __init__(self, task_name, delay):
        Thread.__init__(self)
        self.task_name = task_name
        self.delay = delay

    def run(self):
        print(f"{task_name} 开始，等待 {delay} 秒...")
        time.sleep(delay)
        print(f"{task_name} 完成")

# 创建多个线程实例
tasks = [("任务1", 2), ("任务2", 3), ("任务3", 1)]

threads = []

for task_name, delay in tasks:
    thread = Worker(task_name, delay)
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()


任务1 开始，等待 2 秒...
任务2 开始，等待 3 秒...
任务3 开始，等待 1 秒...
任务3 完成
任务3 完成
任务3 完成


### 继承Thread 类,实现生产者以及消费者

In [14]:
import threading
import time
from queue import Queue  # 使用 Python 3 中的 queue 模块

class Worker(threading.Thread):  # 确保继承 threading.Thread
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        while True:
            message = self.queue.get()  # 从队列中获取任务
            if message == "DONE":  # 如果收到 "DONE"，则退出线程
                print(f"{threading.current_thread().name} finished.")
                self.queue.task_done()  # 标记任务完成
                break
            print(f"Consumer {threading.current_thread().name} got message: {message}")
            self.queue.task_done()  # 标记任务完成

class Producer(object):
    def __init__(self, queue):
        self.queue = queue

    def produce(self):
        for x in range(4):  # 生产 4 个任务
            message = f"task-{x}"
            print(f"Producer putting {message} into the queue.")
            self.queue.put(message)
            time.sleep(1)  # 模拟生产的延时

# 创建队列
task_queue = Queue()

# 创建生产者和消费者
producer = Producer(task_queue)

# 启动消费者线程
consumers = []
for i in range(2):  # 启动 2 个消费者线程
    consumer = Worker(task_queue)
    consumer.start()
    consumers.append(consumer)

# 启动生产者线程
producer.produce()

# 等待队列中所有任务完成
task_queue.join()

# 停止消费者线程
for i in range(2):
    task_queue.put("DONE")  # 向消费者线程发送停止信号

# 等待所有消费者线程退出
for consumer in consumers:
    consumer.join()

print("All tasks are done.")


Producer putting task-0 into the queue.
Consumer Thread-16 got message: task-0
Producer putting task-1 into the queue.
Consumer Thread-17 got message: task-1
Producer putting task-2 into the queue.
Consumer Thread-16 got message: task-2
Producer putting task-3 into the queue.
Consumer Thread-17 got message: task-3
Thread-16 finished.
Thread-17 finished.
All tasks are done.
