In [None]:
import time
import functools

# 1.1 简单的装饰器：计算函数执行时间
def timer(func):
    @functools.wraps(func)  # 保留原函数的元信息
    def wrapper(*args, **kwargs):
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        print(f"{func.__name__} 执行时间: {end_time - start_time:.4f} 秒")
        return result
    return wrapper

# 使用装饰器
@timer
def slow_function():
    time.sleep(1)  # 模拟耗时操作
    print("函数执行完成")

print("测试计时器装饰器：")
slow_function()

# 1.2 带参数的装饰器：访问控制
def require_permission(permission):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            if check_permission(permission):
                return func(*args, **kwargs)
            else:
                raise PermissionError(f"需要 {permission} 权限")
        return wrapper
    return decorator

def check_permission(permission):
    # 模拟权限检查
    allowed_permissions = ["admin", "user"]
    return permission in allowed_permissions

# 使用带参数的装饰器
@require_permission("admin")
def admin_function():
    print("执行管理员操作")

@require_permission("guest")
def guest_function():
    print("执行访客操作")

print("\n测试权限装饰器：")
try:
    print("尝试执行管理员函数：")
    admin_function()
except PermissionError as e:
    print(f"错误：{e}")

try:
    print("\n尝试执行访客函数：")
    guest_function()
except PermissionError as e:
    print(f"错误：{e}")


In [None]:
# 2.1 生成器函数
def fibonacci(n):
    """生成斐波那契数列的生成器"""
    a, b = 0, 1
    for _ in range(n):
        yield a
        a, b = b, a + b

print("使用生成器获取斐波那契数列：")
for num in fibonacci(10):
    print(num, end=' ')

# 2.2 生成器表达式
print("\n\n使用生成器表达式：")
squares = (x**2 for x in range(5))
print("平方数：", end=' ')
for square in squares:
    print(square, end=' ')

# 2.3 自定义迭代器
class CountDown:
    """倒计时迭代器"""
    def __init__(self, start):
        self.start = start
    
    def __iter__(self):
        return self
    
    def __next__(self):
        if self.start <= 0:
            raise StopIteration
        self.start -= 1
        return self.start + 1

print("\n\n使用自定义迭代器：")
countdown = CountDown(5)
print("倒计时：", end=' ')
for num in countdown:
    print(num, end=' ')


In [None]:
from contextlib import contextmanager
import time



# 3.2 使用装饰器创建上下文管理器
@contextmanager
def timer():
    start = time.time()
    yield
    duration = time.time() - start
    print(f"代码块执行时间：{duration:.4f} 秒")

# 使用装饰器形式的上下文管理器
print("\n使用装饰器形式的上下文管理器：")
with timer():
    time.sleep(1)  # 模拟耗时操作
    print("执行了一些操作")

# 3.3 实际应用示例：临时更改工作目录
import os
@contextmanager
def change_dir(path):
    old_dir = os.getcwd()
    try:
        os.chdir(path)
        yield
    finally:
        os.chdir(old_dir)

# 使用目录切换上下文管理器
print("\n使用目录切换上下文管理器：")
print(f"当前目录: {os.getcwd()}")
with change_dir(".."):
    print(f"在with语句内的目录: {os.getcwd()}")
print(f"退出with语句后的目录: {os.getcwd()}")


In [None]:
import re

# 4.1 基本匹配
text = "我的电话是123-4567-8901，邮箱是example@email.com"

# 匹配电话号码
phone_pattern = r'\d{3}-\d{4}-\d{4}'
phone = re.search(phone_pattern, text)
print("找到的电话号码:", phone.group() if phone else "未找到")

# 匹配邮箱
email_pattern = r'[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}'
email = re.search(email_pattern, text)
print("找到的邮箱:", email.group() if email else "未找到")

# 4.2 替换操作
# 隐藏电话号码中间四位
hidden_phone = re.sub(r'(\d{3})-\d{4}-(\d{4})', r'\1-****-\2', text)
print("\n隐藏后的文本:", hidden_phone)

# 4.3 多次匹配
text_multi = """
联系方式：
电话：123-4567-8901
手机：186-7654-3210
传真：025-8888-9999
"""

# 找出所有电话号码
all_phones = re.findall(phone_pattern, text_multi)
print("\n所有电话号码:")
for phone in all_phones:
    print(phone)

# 4.4 分组匹配
date_text = "今天是2024-03-15"
date_pattern = r'(\d{4})-(\d{2})-(\d{2})'
match = re.search(date_pattern, date_text)
if match:
    year, month, day = match.groups()
    print(f"\n日期解析: {year}年{month}月{day}日")


In [None]:
import threading
import multiprocessing
import time
import requests

# 5.1 多线程示例：下载网页
def download_page(url):
    print(f"开始下载 {url}")
    try:
        response = requests.get(url)
        print(f"完成下载 {url}, 状态码: {response.status_code}")
    except Exception as e:
        print(f"下载 {url} 失败: {e}")

# 创建多个线程下载网页
urls = [
    "https://www.python.org",
    "https://www.google.com",
    "https://www.github.com"
]

print("使用多线程下载网页：")
threads = []
for url in urls:
    thread = threading.Thread(target=download_page, args=(url,))
    threads.append(thread)
    thread.start()

# 等待所有线程完成
for thread in threads:
    thread.join()

# 5.2 多进程示例：CPU密集型计算
def calculate_square(numbers, result_dict):
    """计算数字的平方并存储在共享字典中"""
    for n in numbers:
        result_dict[n] = n * n
        time.sleep(0.1)  # 模拟耗时计算

if __name__ == '__main__':
    # 创建进程间共享的字典
    manager = multiprocessing.Manager()
    result_dict = manager.dict()
    
    # 将数据分配给多个进程
    numbers = list(range(10))
    process1 = multiprocessing.Process(
        target=calculate_square,
        args=(numbers[:5], result_dict)
    )
    process2 = multiprocessing.Process(
        target=calculate_square,
        args=(numbers[5:], result_dict)
    )
    
    print("\n使用多进程进行计算：")
    start_time = time.time()
    
    # 启动进程
    process1.start()
    process2.start()
    
    # 等待进程完成
    process1.join()
    process2.join()
    
    end_time = time.time()
    print(f"计算完成，耗时: {end_time - start_time:.2f} 秒")
    print("结果:", dict(result_dict))


In [None]:
import socket
import json
from http.server import HTTPServer, SimpleHTTPRequestHandler
import threading
import requests

# 6.1 基本的Socket服务器
def start_socket_server():
    # 创建Socket服务器
    server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    server.bind(('localhost', 12345))
    server.listen(1)
    print("Socket服务器启动在 localhost:12345")
    
    # 接受连接
    conn, addr = server.accept()
    print(f"客户端连接：{addr}")
    
    # 接收数据
    data = conn.recv(1024).decode()
    print(f"收到数据：{data}")
    
    # 发送响应
    response = f"服务器收到：{data}"
    conn.send(response.encode())
    
    # 关闭连接
    conn.close()
    server.close()

# 6.2 Socket客户端
def socket_client():
    # 创建客户端Socket
    client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    client.connect(('localhost', 12345))
    
    # 发送数据
    message = "Hello, Server!"
    client.send(message.encode())
    
    # 接收响应
    response = client.recv(1024).decode()
    print(f"服务器响应：{response}")
    
    # 关闭连接
    client.close()

# 6.3 简单的HTTP服务器
class CustomHandler(SimpleHTTPRequestHandler):
    def do_GET(self):
        self.send_response(200)
        self.send_header('Content-type', 'application/json')
        self.end_headers()
        
        response = {
            'message': 'Hello from Python HTTP Server!',
            'path': self.path
        }
        
        self.wfile.write(json.dumps(response).encode())

def start_http_server():
    server = HTTPServer(('localhost', 8000), CustomHandler)
    print("HTTP服务器启动在 http://localhost:8000")
    server.serve_forever()

# 启动HTTP服务器
print("启动HTTP服务器...")
http_thread = threading.Thread(target=start_http_server)
http_thread.daemon = True  # 设置为守护线程，这样主程序退出时会自动结束
http_thread.start()

# 等待服务器启动
time.sleep(1)

# 发送HTTP请求
print("\n发送HTTP请求...")
try:
    response = requests.get('http://localhost:8000')
    print("HTTP响应:", response.json())
except Exception as e:
    print(f"请求失败: {e}")

# 启动Socket服务器和客户端
print("\n启动Socket服务器和客户端...")
server_thread = threading.Thread(target=start_socket_server)
server_thread.daemon = True
server_thread.start()

# 等待服务器启动
time.sleep(1)

# 运行Socket客户端
socket_client()


In [2]:
def nums(n):
    i = 0
    while i < n:
        yield i
        i += 1
print(list(nums(10)))

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
