In [None]:
# 导入所需模块
import functools
import time
import asyncio
from typing import Any, Callable, Dict, List, Optional
from collections import defaultdict
import threading
import weakref

print("高级装饰器环境设置完成！")


In [None]:
# 装饰器工厂和参数化装饰器示例

# 1. 计时装饰器工厂
def timer(prefix: str = "执行"):
    """计时装饰器工厂"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            start_time = time.time()
            result = func(*args, **kwargs)
            end_time = time.time()
            print(f"{prefix} {func.__name__} 耗时: {end_time - start_time:.3f}秒")
            return result
        return wrapper
    return decorator

# 2. 重试装饰器工厂
def retry(max_attempts: int = 3, delay: float = 1.0):
    """重试装饰器工厂"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        print(f"函数 {func.__name__} 在 {max_attempts} 次尝试后失败")
                        raise
                    print(f"第 {attempt + 1} 次尝试失败: {e}, {delay}秒后重试...")
                    time.sleep(delay)
            return None
        return wrapper
    return decorator

# 3. 权限检查装饰器工厂
def require_permission(permission: str):
    """权限检查装饰器工厂"""
    def decorator(func: Callable) -> Callable:
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 模拟权限检查
            user_permissions = kwargs.get('user_permissions', [])
            if permission not in user_permissions:
                raise PermissionError(f"需要 {permission} 权限")
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 使用装饰器工厂
@timer("函数")
def calculate_sum(n: int) -> int:
    """计算1到n的和"""
    return sum(range(1, n + 1))

@retry(max_attempts=3, delay=0.5)
def unstable_function(failure_rate: float = 0.7):
    """不稳定的函数，用于演示重试"""
    import random
    if random.random() < failure_rate:
        raise Exception("随机失败")
    return "成功！"

@require_permission("admin")
def admin_only_function(message: str, user_permissions: List[str]):
    """只有管理员才能调用的函数"""
    return f"管理员操作: {message}"

# 演示装饰器工厂
print("=== 装饰器工厂演示 ===")

# 计时装饰器
result = calculate_sum(10000)
print(f"计算结果: {result}")

# 重试装饰器
try:
    result = unstable_function(0.3)  # 降低失败率
    print(f"重试结果: {result}")
except Exception as e:
    print(f"重试失败: {e}")

# 权限检查装饰器
try:
    result = admin_only_function("删除用户", user_permissions=["admin", "user"])
    print(result)
except PermissionError as e:
    print(f"权限错误: {e}")

try:
    result = admin_only_function("删除用户", user_permissions=["user"])
    print(result)
except PermissionError as e:
    print(f"权限错误: {e}")


In [None]:
# 缓存和性能优化装饰器

# 1. 简单缓存装饰器
def simple_cache(func: Callable) -> Callable:
    """简单的缓存装饰器"""
    cache = {}
    
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # 创建缓存键
        key = str(args) + str(sorted(kwargs.items()))
        
        if key in cache:
            print(f"缓存命中: {func.__name__}")
            return cache[key]
        
        print(f"计算中: {func.__name__}")
        result = func(*args, **kwargs)
        cache[key] = result
        return result
    
    wrapper.cache = cache  # type: ignore  # 允许外部访问缓存
    wrapper.cache_clear = lambda: cache.clear()  # type: ignore  # 清空缓存
    return wrapper

# 2. LRU缓存装饰器
def lru_cache(maxsize: int = 128):
    """LRU缓存装饰器"""
    def decorator(func: Callable) -> Callable:
        cache = {}
        access_order = []  # 记录访问顺序
        
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            key = str(args) + str(sorted(kwargs.items()))
            
            if key in cache:
                # 更新访问顺序
                access_order.remove(key)
                access_order.append(key)
                print(f"LRU缓存命中: {func.__name__}")
                return cache[key]
            
            # 计算新值
            result = func(*args, **kwargs)
            
            # 如果缓存已满，删除最久未使用的项目
            if len(cache) >= maxsize:
                oldest_key = access_order.pop(0)
                del cache[oldest_key]
            
            cache[key] = result
            access_order.append(key)
            print(f"LRU缓存存储: {func.__name__}")
            return result
        
        wrapper.cache_info = lambda: {  # type: ignore
            'size': len(cache),
            'maxsize': maxsize,
            'keys': list(cache.keys())
        }
        wrapper.cache_clear = lambda: cache.clear() or access_order.clear()  # type: ignore
        return wrapper
    
    return decorator

# 3. 性能监控装饰器
def performance_monitor(func: Callable) -> Callable:
    """性能监控装饰器"""
    call_count = 0
    total_time = 0
    
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        nonlocal call_count, total_time
        
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        
        call_count += 1
        execution_time = end_time - start_time
        total_time += execution_time
        
        print(f"函数 {func.__name__} 第 {call_count} 次调用，耗时: {execution_time:.3f}秒")
        return result
    
    wrapper.get_stats = lambda: {  # type: ignore
        'call_count': call_count,
        'total_time': total_time,
        'average_time': total_time / call_count if call_count > 0 else 0
    }
    return wrapper

# 使用缓存和性能装饰器
@simple_cache
def fibonacci(n: int) -> int:
    """计算斐波那契数列"""
    if n <= 1:
        return n
    return fibonacci(n - 1) + fibonacci(n - 2)

@lru_cache(maxsize=3)
def expensive_computation(x: int, y: int) -> int:
    """模拟昂贵的计算"""
    time.sleep(0.1)  # 模拟计算时间
    return x * y + x ** 2

@performance_monitor
def matrix_multiply(size: int) -> int:
    """矩阵乘法性能测试"""
    import random
    matrix = [[random.randint(1, 10) for _ in range(size)] for _ in range(size)]
    result = 0
    for i in range(size):
        for j in range(size):
            result += matrix[i][j] * matrix[j][i]
    return result

# 演示缓存和性能装饰器
print("=== 缓存和性能装饰器演示 ===")

# 测试简单缓存
print("测试斐波那契缓存:")
print(f"fibonacci(10) = {fibonacci(10)}")
print(f"fibonacci(10) = {fibonacci(10)}")  # 应该使用缓存
print(f"缓存大小: {len(fibonacci.cache)}")  # type: ignore

# 测试LRU缓存
print("\n测试LRU缓存:")
expensive_computation(2, 3)
expensive_computation(4, 5)
expensive_computation(6, 7)
expensive_computation(8, 9)  # 这会导致第一个被清除
expensive_computation(2, 3)  # 这会重新计算
print(f"LRU缓存信息: {expensive_computation.cache_info()}")  # type: ignore

# 测试性能监控
print("\n测试性能监控:")
for i in range(3):
    matrix_multiply(50)
print(f"性能统计: {matrix_multiply.get_stats()}")  # type: ignore


In [None]:
# 类装饰器和方法装饰器示例

# 1. 类装饰器
def add_logging(cls):
    """为类添加日志功能的装饰器"""
    original_init = cls.__init__
    
    def new_init(self, *args, **kwargs):
        print(f"创建 {cls.__name__} 实例")
        original_init(self, *args, **kwargs)
        print(f"{cls.__name__} 实例创建完成")
    
    cls.__init__ = new_init
    return cls

def singleton(cls):
    """单例装饰器"""
    instances = {}
    
    def get_instance(*args, **kwargs):
        if cls not in instances:
            instances[cls] = cls(*args, **kwargs)
        return instances[cls]
    
    return get_instance

def add_repr(cls):
    """为类添加repr方法的装饰器"""
    def __repr__(self):
        class_name = self.__class__.__name__
        attrs = [f"{key}={value!r}" for key, value in self.__dict__.items()]
        return f"{class_name}({', '.join(attrs)})"
    
    cls.__repr__ = __repr__
    return cls

# 2. 方法装饰器
def validate_types(**expected_types):
    """参数类型验证装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            # 验证位置参数
            func_args = func.__code__.co_varnames[:func.__code__.co_argcount]
            for i, (arg_name, arg_value) in enumerate(zip(func_args, args)):
                if arg_name in expected_types:
                    expected_type = expected_types[arg_name]
                    if not isinstance(arg_value, expected_type):
                        raise TypeError(
                            f"参数 {arg_name} 期望类型 {expected_type.__name__}, "
                            f"但得到 {type(arg_value).__name__}"
                        )
            
            # 验证关键字参数
            for arg_name, arg_value in kwargs.items():
                if arg_name in expected_types:
                    expected_type = expected_types[arg_name]
                    if not isinstance(arg_value, expected_type):
                        raise TypeError(
                            f"参数 {arg_name} 期望类型 {expected_type.__name__}, "
                            f"但得到 {type(arg_value).__name__}"
                        )
            
            return func(*args, **kwargs)
        return wrapper
    return decorator

def deprecated(reason: str = ""):
    """废弃方法装饰器"""
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            import warnings
            message = f"函数 {func.__name__} 已废弃"
            if reason:
                message += f": {reason}"
            warnings.warn(message, DeprecationWarning, stacklevel=2)
            return func(*args, **kwargs)
        return wrapper
    return decorator

# 3. 异步装饰器
def async_retry(max_attempts: int = 3, delay: float = 1.0):
    """异步重试装饰器"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            for attempt in range(max_attempts):
                try:
                    return await func(*args, **kwargs)
                except Exception as e:
                    if attempt == max_attempts - 1:
                        raise
                    print(f"异步函数 {func.__name__} 第 {attempt + 1} 次尝试失败: {e}")
                    await asyncio.sleep(delay)
            return None
        return wrapper
    return decorator

def async_timeout(timeout_seconds: float):
    """异步超时装饰器"""
    def decorator(func):
        @functools.wraps(func)
        async def wrapper(*args, **kwargs):
            try:
                return await asyncio.wait_for(func(*args, **kwargs), timeout=timeout_seconds)
            except asyncio.TimeoutError:
                print(f"异步函数 {func.__name__} 执行超时 ({timeout_seconds}秒)")
                raise
        return wrapper
    return decorator

# 使用装饰器示例
@add_logging
@add_repr
class User:
    def __init__(self, name: str, age: int):
        self.name = name
        self.age = age

@singleton
class Database:
    def __init__(self):
        self.connection = "数据库连接"
        print("数据库连接已建立")

class Calculator:
    @validate_types(a=int, b=int)
    def add(self, a, b):
        return a + b
    
    @deprecated("使用 add 方法代替")
    def sum(self, a, b):
        return a + b

# 异步函数示例
@async_retry(max_attempts=3, delay=0.5)
async def unreliable_async_function():
    """不可靠的异步函数"""
    import random
    if random.random() < 0.7:
        raise Exception("随机失败")
    return "成功!"

@async_timeout(2.0)
async def slow_async_function():
    """慢速异步函数"""
    await asyncio.sleep(3.0)  # 这会导致超时
    return "完成"

# 演示装饰器
print("=== 类装饰器和方法装饰器演示 ===")

# 测试类装饰器
print("1. 类装饰器测试:")
user = User("Alice", 30)
print(f"用户对象: {user}")

# 测试单例装饰器
print("\n2. 单例装饰器测试:")
db1 = Database()
db2 = Database()
print(f"数据库实例相同: {db1 is db2}")

# 测试方法装饰器
print("\n3. 方法装饰器测试:")
calc = Calculator()
print(f"正确类型: {calc.add(5, 3)}")

try:
    calc.add("5", 3)  # 这会引发类型错误
except TypeError as e:
    print(f"类型错误: {e}")

# 测试废弃装饰器
print("\n4. 废弃装饰器测试:")
import warnings
with warnings.catch_warnings(record=True) as w:
    warnings.simplefilter("always")
    result = calc.sum(1, 2)
    if w:
        print(f"废弃警告: {w[0].message}")

# 测试异步装饰器
print("\n5. 异步装饰器测试:")

async def test_async_decorators():
    # 测试异步重试
    try:
        result = await unreliable_async_function()
        print(f"异步重试成功: {result}")
    except Exception as e:
        print(f"异步重试失败: {e}")
    
    # 测试异步超时
    try:
        result = await slow_async_function()
        print(f"异步超时成功: {result}")
    except asyncio.TimeoutError:
        print("异步函数执行超时")

await test_async_decorators()
