###  基础网络请求

In [None]:
import urllib.request

# 简单GET请求
response = urllib.request.urlopen('http://httpbin.org/get')
print(response.read().decode('utf-8'))  # 获取响应内容
print(response.status)  # 状态码

### requests库（推荐）

In [None]:
import requests

# GET请求
response = requests.get('https://httpbin.org/get')
print(response.text)     # 响应文本
print(response.json())   # 如果返回JSON
print(response.status_code)  # 状态码

# 带参数的GET
params = {'key1': 'value1', 'key2': 'value2'}
response = requests.get('https://httpbin.org/get', params=params)

# POST请求
data = {'username': 'admin', 'password': '123456'}
response = requests.post('https://httpbin.org/post', data=data)

# 带JSON的POST
json_data = {'name': '张三', 'age': 25}
response = requests.post('https://httpbin.org/post', json=json_data)

# 设置请求头
headers = {'User-Agent': 'my-app/1.0'}
response = requests.get('https://httpbin.org/headers', headers=headers)

### TCP Socket

In [None]:
import socket

# 客户端
def tcp_client():
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('127.0.0.1', 8888))  # 连接服务器
    client.send(b'Hello Server!')        # 发送数据
    response = client.recv(1024)         # 接收数据
    print(f"收到: {response.decode()}")
    client.close()

# 服务器端
def tcp_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('127.0.0.1', 8888))     # 绑定地址和端口
    server.listen(5)                     # 开始监听
    
    print("服务器启动...")
    while True:
        client, addr = server.accept()   # 接受连接
        print(f"客户端连接: {addr}")
        
        data = client.recv(1024)         # 接收数据
        print(f"收到: {data.decode()}")
        
        client.send(b'Hello Client!')    # 发送响应
        client.close()

# 先运行服务器，再运行客户端

### UDP Socket

In [None]:
import socket

# UDP客户端
def udp_client():
    client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    client.sendto(b'Hello UDP Server!', ('127.0.0.1', 9999))
    
    data, addr = client.recvfrom(1024)
    print(f"从{addr}收到: {data.decode()}")
    client.close()

# UDP服务器
def udp_server():
    server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    server.bind(('127.0.0.1', 9999))
    
    print("UDP服务器启动...")
    while True:
        data, addr = server.recvfrom(1024)
        print(f"从{addr}收到: {data.decode()}")
        
        server.sendto(b'Hello UDP Client!', addr)

### 网页内容抓取

In [None]:
import requests
from bs4 import BeautifulSoup  # 需要安装: pip install beautifulsoup4

def fetch_webpage(url):
    """抓取网页内容"""
    try:
        response = requests.get(url, timeout=5)
        response.raise_for_status()  # 检查HTTP错误
        response.encoding = 'utf-8'  # 设置编码
        
        # 解析HTML
        soup = BeautifulSoup(response.text, 'html.parser')
        
        # 提取标题
        title = soup.title.string if soup.title else "无标题"
        print(f"网页标题: {title}")
        
        # 提取所有链接
        links = [a.get('href') for a in soup.find_all('a')]
        print(f"找到 {len(links)} 个链接")
        
        # 提取正文（简单示例）
        paragraphs = [p.get_text() for p in soup.find_all('p')[:3]]
        print("前3个段落:", paragraphs[:100])  # 显示前100字符
        
    except requests.RequestException as e:
        print(f"请求失败: {e}")

# 使用
fetch_webpage('https://www.python.org')

### API客户端

In [None]:
import requests

class WeatherAPI:
    """天气API客户端"""
    
    def __init__(self, api_key):
        self.api_key = api_key
        self.base_url = "http://api.openweathermap.org/data/2.5/weather"
    
    def get_weather(self, city):
        """获取城市天气"""
        params = {
            'q': city,
            'appid': self.api_key,
            'units': 'metric',  # 摄氏度
            'lang': 'zh_cn'     # 中文
        }
        
        try:
            response = requests.get(self.base_url, params=params, timeout=10)
            data = response.json()
            
            if response.status_code == 200:
                return {
                    'city': data['name'],
                    'temp': data['main']['temp'],
                    'humidity': data['main']['humidity'],
                    'description': data['weather'][0]['description']
                }
            else:
                return f"错误: {data.get('message', '未知错误')}"
                
        except requests.RequestException as e:
            return f"请求失败: {e}"

# 使用（需要真实的API key）
# weather = WeatherAPI("your_api_key")
# print(weather.get_weather("Beijing"))

### 文件下载器

In [None]:
import requests
import os

def download_file(url, save_path=None):
    """下载文件"""
    try:
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        
        # 如果没有指定保存路径，使用URL中的文件名
        if save_path is None:
            save_path = url.split('/')[-1]
        
        # 获取文件大小
        file_size = int(response.headers.get('content-length', 0))
        
        print(f"下载: {url}")
        print(f"保存到: {save_path}")
        print(f"文件大小: {file_size / 1024:.1f} KB")
        
        # 下载并保存
        with open(save_path, 'wb') as f:
            downloaded = 0
            for chunk in response.iter_content(chunk_size=8192):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)
                    
                    # 显示进度
                    if file_size:
                        percent = downloaded / file_size * 100
                        print(f"\r进度: {percent:.1f}%", end='')
        
        print(f"\n下载完成: {save_path}")
        return save_path
        
    except requests.RequestException as e:
        print(f"下载失败: {e}")
        return None

# 使用
# download_file("https://example.com/file.zip", "local_file.zip")

### 端口扫描器

In [None]:
import socket
import threading
from queue import Queue

def scan_port(host, port, timeout=1):
    """扫描单个端口"""
    try:
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(timeout)
        result = sock.connect_ex((host, port))
        
        if result == 0:
            print(f"端口 {port}: 开放")
        sock.close()
        
    except socket.error:
        pass

def port_scanner(host, start_port=1, end_port=1000, max_threads=100):
    """多线程端口扫描"""
    print(f"扫描主机: {host}")
    print(f"端口范围: {start_port}-{end_port}")
    
    # 创建任务队列
    queue = Queue()
    for port in range(start_port, end_port + 1):
        queue.put(port)
    
    def worker():
        while not queue.empty():
            port = queue.get()
            scan_port(host, port)
            queue.task_done()
    
    # 创建并启动线程
    threads = []
    for _ in range(min(max_threads, end_port - start_port + 1)):
        t = threading.Thread(target=worker)
        t.start()
        threads.append(t)
    
    # 等待所有线程完成
    queue.join()
    print("扫描完成")

# 使用（请仅扫描自己拥有的主机）
# port_scanner("127.0.0.1", 1, 100)