# Python 网络编程

本教程将学习Python中的网络编程，包括socket编程、HTTP请求、Web爬虫等内容。

## 1. Socket 编程基础 - TCP 服务器和客户端


In [1]:
import socket

# TCP 服务器示例
def tcp_server():
    """创建一个简单的TCP服务器"""
    # 创建socket对象
    # AF_INET 表示使用 IPv4 地址
    # SOCK_STREAM 表示使用 TCP 协议
    server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 设置地址重用
    server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    
    # 绑定地址和端口
    host = 'localhost'
    port = 8888
    server_socket.bind((host, port))
    
    # 开始监听
    # 5 表示最大连接数
    server_socket.listen(5)
    print(f"服务器启动，监听 {host}:{port}")
    
    while True:
        # 接受客户端连接
        client_socket, address = server_socket.accept()
        print(f"收到来自 {address} 的连接")
        
        # 接收数据
        data = client_socket.recv(1024).decode('utf-8')
        print(f"接收到数据: {data}")
        
        # 发送响应
        response = f"服务器收到: {data}"
        client_socket.send(response.encode('utf-8'))
        
        # 关闭客户端连接
        client_socket.close()
        
        # 演示用，只处理一次连接
        break
    
    server_socket.close()

# 注意：在实际使用中，服务器应该在单独的线程或进程中运行
# print("TCP服务器代码（需要单独运行）:")
print("TCP服务器代码已定义，可以在后台运行")


TCP服务器代码已定义，可以在后台运行


In [8]:
# TCP 客户端示例
def tcp_client():
    """创建一个简单的TCP客户端"""
    # 创建socket对象
    client_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    
    # 连接到服务器
    host = 'localhost'
    port = 8888
    
    try:
        client_socket.connect((host, port))
        print(f"已连接到服务器 {host}:{port}")
        
        # 发送数据
        message = "Hello, Server!"
        client_socket.send(message.encode('utf-8'))
        print(f"发送: {message}")
        
        # 接收响应
        response = client_socket.recv(1024).decode('utf-8')
        print(f"服务器响应: {response}")
        
    except ConnectionRefusedError:
        print("无法连接到服务器，请确保服务器正在运行")
    finally:
        client_socket.close()

print("TCP客户端代码已定义")
# 注意：运行客户端前需要先启动服务器


TCP客户端代码已定义


## 2. HTTP 请求 - urllib 和 requests 库


In [None]:
# 使用 urllib（Python标准库）
from urllib.request import urlopen, Request
from urllib.error import URLError
import json

# 简单的GET请求
try:
    # 发送GET请求
    response = urlopen('https://httpbin.org/get')
    data = response.read().decode('utf-8')
    print("使用 urllib 发送GET请求:")
    print(f"状态码: {response.status}")
    print(f"响应内容（前200字符）: {data[:200]}")
except URLError as e:
    print(f"请求失败: {e}")

print("\n" + "="*50)


In [9]:
# 使用 requests 库（需要安装: pip install requests）
# 这是一个更强大和易用的HTTP库

try:
    import requests
    
    # GET请求
    response = requests.get('https://httpbin.org/get', params={'key': 'value'})
    print("使用 requests 发送GET请求:")
    print(f"状态码: {response.status_code}")
    print(f"响应JSON: {response.json()}")
    
    # POST请求
    post_data = {'name': '张三', 'age': 25}
    response = requests.post('https://httpbin.org/post', data=post_data)
    print(f"\nPOST请求状态码: {response.status_code}")
    print(f"POST响应: {response.json()}")
    
    # 设置请求头
    headers = {'User-Agent': 'MyApp/1.0'}
    response = requests.get('https://httpbin.org/headers', headers=headers)
    print(f"\n带自定义请求头: {response.json()['headers']}")
    
except ImportError:
    print("requests库未安装，使用 'pip install requests' 安装")
except Exception as e:
    print(f"请求失败: {e}")


使用 requests 发送GET请求:
状态码: 200
响应JSON: {'args': {'key': 'value'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.32.3', 'X-Amzn-Trace-Id': 'Root=1-692403fa-46a9467b0d4289074bb95fbd'}, 'origin': '209.50.227.209', 'url': 'https://httpbin.org/get?key=value'}

POST请求状态码: 200
POST响应: {'args': {}, 'data': '', 'files': {}, 'form': {'age': '25', 'name': '张三'}, 'headers': {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Content-Length': '30', 'Content-Type': 'application/x-www-form-urlencoded', 'Host': 'httpbin.org', 'User-Agent': 'python-requests/2.32.3', 'X-Amzn-Trace-Id': 'Root=1-692403fc-589aaae206b80b820c6ba741'}, 'json': None, 'origin': '209.50.227.209', 'url': 'https://httpbin.org/post'}

带自定义请求头: {'Accept': '*/*', 'Accept-Encoding': 'gzip, deflate, br, zstd', 'Host': 'httpbin.org', 'User-Agent': 'MyApp/1.0', 'X-Amzn-Trace-Id': 'Root=1-692403fd-6fb79a0904689a996e0af6e6'}


## 3. 简单的Web爬虫示例


In [None]:
# 简单的网页爬虫（演示用）
import re
from urllib.request import urlopen
from urllib.error import URLError

def simple_crawler(url):
    """简单的网页爬虫示例"""
    try:
        # 发送请求
        response = urlopen(url)
        html = response.read().decode('utf-8')
        
        # 提取标题
        title_match = re.search(r'<title>(.*?)</title>', html, re.IGNORECASE)
        if title_match:
            print(f"网页标题: {title_match.group(1)}")
        
        # 提取所有链接（简单示例）
        links = re.findall(r'href=["\'](.*?)["\']', html)
        print(f"\n找到 {len(links)} 个链接")
        print("前5个链接:")
        for link in links[:5]:
            print(f"  - {link}")
            
    except URLError as e:
        print(f"爬取失败: {e}")

# 演示（使用一个公开的测试网站）
# simple_crawler('https://httpbin.org/html')
print("网页爬虫函数已定义，可以使用 simple_crawler(url) 进行爬取")
print("注意：实际爬虫需要处理更多情况，如反爬虫、JavaScript渲染等")


## 4. 使用 urllib.parse 处理URL


In [10]:
from urllib.parse import urlparse, urljoin, urlencode, quote, unquote

# 解析URL
url = "https://www.example.com/path/to/page?param1=value1&param2=value2"
parsed = urlparse(url)
print("URL解析:")
print(f"协议: {parsed.scheme}")
print(f"域名: {parsed.netloc}")
print(f"路径: {parsed.path}")
print(f"查询参数: {parsed.query}")
print(f"片段: {parsed.fragment}")

# URL拼接
base_url = "https://www.example.com"
relative_path = "/page.html"
full_url = urljoin(base_url, relative_path)
print(f"\nURL拼接: {full_url}")

# 构建查询字符串
params = {'name': '张三', 'age': 25, 'city': '北京'}
query_string = urlencode(params)
print(f"\n查询字符串: {query_string}")

# URL编码和解码
original = "你好世界"
encoded = quote(original)
decoded = unquote(encoded)
print(f"\nURL编码: {original} -> {encoded}")
print(f"URL解码: {encoded} -> {decoded}")


URL解析:
协议: https
域名: www.example.com
路径: /path/to/page
查询参数: param1=value1&param2=value2
片段: 

URL拼接: https://www.example.com/page.html

查询字符串: name=%E5%BC%A0%E4%B8%89&age=25&city=%E5%8C%97%E4%BA%AC

URL编码: 你好世界 -> %E4%BD%A0%E5%A5%BD%E4%B8%96%E7%95%8C
URL解码: %E4%BD%A0%E5%A5%BD%E4%B8%96%E7%95%8C -> 你好世界


## 5. 使用上下文管理器 (Context Managers)

使用 `with` 语句可以自动管理 socket 的关闭，避免资源泄漏。


In [None]:
import socket

def check_port(host, port):
    """检查端口是否开放"""
    try:
        # 使用 with 语句自动关闭 socket
        with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
            s.settimeout(1)
            # connect_ex 返回 0 表示端口是开放的，否则是关闭的
            result = s.connect_ex((host, port))
            if result == 0:
                print(f"端口 {port} 是开放的")
            else:
                print(f"端口 {port} 是关闭的")
    except Exception as e:
        print(f"检查出错: {e}")

check_port('www.baidu.com', 80)
check_port('www.baidu.com', 81)


端口 80 是开放的
端口 81 是开放的


## 6. 异步网络编程 (Asyncio)

对于高并发的网络应用，`asyncio` 是更好的选择。


In [13]:
import asyncio

async def fetch_url(url):
    """模拟异步获取URL"""
    print(f"开始获取: {url}")
    await asyncio.sleep(1)  # 模拟网络延迟
    print(f"完成获取: {url}")
    return f"Content of {url}"

async def main():
    urls = ['http://example.com/1', 'http://example.com/2', 'http://example.com/3']
    # 并发执行任务
    tasks = [fetch_url(url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f"所有结果: {results}")

# 在 Jupyter 中运行 asyncio
await main()
print("在 Jupyter Notebook 中，可以直接使用 await main() 运行")


开始获取: http://example.com/1
开始获取: http://example.com/2
开始获取: http://example.com/3
完成获取: http://example.com/1
完成获取: http://example.com/2
完成获取: http://example.com/3
所有结果: ['Content of http://example.com/1', 'Content of http://example.com/2', 'Content of http://example.com/3']
在 Jupyter Notebook 中，可以直接使用 await main() 运行


## 7. 健壮的错误处理

网络请求经常失败，需要实现重试机制。


In [None]:
import time
import random

def robust_request(url, max_retries=3):
    """带有重试机制的请求模拟"""
    for i in range(max_retries):
        try:
            print(f"尝试请求 {url} (第 {i+1} 次)...")
            # 模拟随机失败
            if random.random() < 0.7:
                raise ConnectionError("连接超时")
            
            print("请求成功！")
            return "Success Data"
            
        except Exception as e:
            print(f"请求失败: {e}")
            if i < max_retries - 1:
                wait_time = 2 ** i  # 指数退避
                print(f"等待 {wait_time} 秒后重试...")
                time.sleep(wait_time)
            else:
                print("达到最大重试次数，放弃")
                raise

# robust_request('http://unstable-api.com')
