# Python Collections.defaultdict 完全指南

## 📚 学习目标
- 理解 defaultdict 的工作原理和优势
- 掌握 defaultdict 的基本用法和语法
- 学会在实际项目中应用 defaultdict
- 了解性能优化和最佳实践
- 掌握常见的使用场景和设计模式

## 🎯 适用场景
- 分组数据处理
- 统计计数
- 嵌套数据结构构建
- 避免 KeyError 异常
- 后端数据聚合和处理


## 1. 基础概念对比

### 普通 dict vs defaultdict
普通字典在访问不存在的键时会抛出 `KeyError`，而 `defaultdict` 会自动创建默认值。


In [2]:

from collections import defaultdict
from typing import Dict, List, Set, Any
import json

# 普通字典的问题演示
print("=== 普通字典的 KeyError 问题 ===")
normal_dict: Dict[str, List[str]] = {}

try:
    # 这会抛出 KeyError
    normal_dict['users'].append('张三')
except KeyError as e:
    print(f"KeyError: {e}")
    
# 传统解决方案：需要检查键是否存在
if 'users' not in normal_dict:
    normal_dict['users'] = []
normal_dict['users'].append('张三')
print(f"普通字典结果: {normal_dict}")

print("\n=== defaultdict 的优雅解决方案 ===")
# defaultdict 自动创建默认值
default_dict = defaultdict(list)  # 默认工厂函数是 list
print('default_dict:',default_dict)
default_dict['users'].append('张三')  # 不会抛出异常
default_dict['users'].append('李四')
default_dict['admins'].append('王五')

print(f"defaultdict 结果: {dict(default_dict)}")
print(f"类型: {type(default_dict)}")


=== 普通字典的 KeyError 问题 ===
KeyError: 'users'
普通字典结果: {'users': ['张三']}

=== defaultdict 的优雅解决方案 ===
default_dict: defaultdict(<class 'list'>, {})
defaultdict 结果: {'users': ['张三', '李四'], 'admins': ['王五']}
类型: <class 'collections.defaultdict'>


🤔疑问：

`default_dict = defaultdict(list)`这行代码在做什么呢?

这行代码创建了一个特殊的字典default_dict，当访问不存在的键时，会自动创建一个空列表[]作为默认值。这比普通字典更方便，因为不需要手动检查键是否存在。

原因：list作为工厂函数传入defaultdict，它会在需要时自动调用list()来创建新的空列表。

## 2. 常用工厂函数

`defaultdict` 的第一个参数是工厂函数，决定了默认值的类型。


In [3]:
print("=== 不同工厂函数的使用 ===")

# 1. list - 用于分组数据
groups = defaultdict(list)
print(f"defaultdict(list) 默认值: {groups['new_key']}")

# 2. int - 用于计数
counters = defaultdict(int)
print(f"defaultdict(int) 默认值: {counters['new_key']}")

# 3. set - 用于去重收集
unique_items = defaultdict(set)
print(f"defaultdict(set) 默认值: {unique_items['new_key']}")

# 4. dict - 用于嵌套字典
nested = defaultdict(dict)
print(f"defaultdict(dict) 默认值: {nested['new_key']}")

# 5. 自定义工厂函数
def default_value():
    return "未设置"

custom_default = defaultdict(default_value)
print(f"自定义工厂函数默认值: {custom_default['new_key']}")

# 6. lambda 表达式作为工厂函数
lambda_default = defaultdict(lambda: {'status': 'active', 'count': 0})
print(f"lambda 工厂函数默认值: {lambda_default['new_key']}")

print(f"\n所有 defaultdict 内容:")
print(f"groups: {dict(groups)}")
print(f"counters: {dict(counters)}")
print(f"unique_items: {dict(unique_items)}")
print(f"nested: {dict(nested)}")
print(f"custom_default: {dict(custom_default)}")
print(f"lambda_default: {dict(lambda_default)}")


=== 不同工厂函数的使用 ===
defaultdict(list) 默认值: []
defaultdict(int) 默认值: 0
defaultdict(set) 默认值: set()
defaultdict(dict) 默认值: {}
自定义工厂函数默认值: 未设置
lambda 工厂函数默认值: {'status': 'active', 'count': 0}

所有 defaultdict 内容:
groups: {'new_key': []}
counters: {'new_key': 0}
unique_items: {'new_key': set()}
nested: {'new_key': {}}
custom_default: {'new_key': '未设置'}
lambda_default: {'new_key': {'status': 'active', 'count': 0}}


## 3. 实际应用场景

### 场景1：数据分组 - 按部门分组员工


In [4]:

# 模拟员工数据
employees = [
    {'name': '张三', 'department': '技术部', 'salary': 15000},
    {'name': '李四', 'department': '销售部', 'salary': 12000},
    {'name': '王五', 'department': '技术部', 'salary': 18000},
    {'name': '赵六', 'department': '人事部', 'salary': 10000},
    {'name': '钱七', 'department': '销售部', 'salary': 13000},
    {'name': '孙八', 'department': '技术部', 'salary': 16000},
]

print("=== 使用 defaultdict 按部门分组员工 ===")

# 按部门分组
department_groups = defaultdict(list)
for emp in employees:
    print('按部门分组 emp:',emp)
    department_groups[emp['department']].append(emp)
print('按部门分组后，department_groups:',department_groups)
# 显示分组结果
for dept, emps in department_groups.items():
    print(f"\n{dept}:")
    for emp in emps:
        print(f"  - {emp['name']}: ¥{emp['salary']:,}")

# 计算各部门平均薪资
dept_avg_salary = {}
for dept, emps in department_groups.items():
    avg_salary = sum(emp['salary'] for emp in emps) / len(emps)
    dept_avg_salary[dept] = avg_salary

print(f"\n各部门平均薪资:")
for dept, avg in dept_avg_salary.items():
    print(f"{dept}: ¥{avg:,.0f}")


=== 使用 defaultdict 按部门分组员工 ===
按部门分组 emp: {'name': '张三', 'department': '技术部', 'salary': 15000}
按部门分组 emp: {'name': '李四', 'department': '销售部', 'salary': 12000}
按部门分组 emp: {'name': '王五', 'department': '技术部', 'salary': 18000}
按部门分组 emp: {'name': '赵六', 'department': '人事部', 'salary': 10000}
按部门分组 emp: {'name': '钱七', 'department': '销售部', 'salary': 13000}
按部门分组 emp: {'name': '孙八', 'department': '技术部', 'salary': 16000}
按部门分组后，department_groups: defaultdict(<class 'list'>, {'技术部': [{'name': '张三', 'department': '技术部', 'salary': 15000}, {'name': '王五', 'department': '技术部', 'salary': 18000}, {'name': '孙八', 'department': '技术部', 'salary': 16000}], '销售部': [{'name': '李四', 'department': '销售部', 'salary': 12000}, {'name': '钱七', 'department': '销售部', 'salary': 13000}], '人事部': [{'name': '赵六', 'department': '人事部', 'salary': 10000}]})

技术部:
  - 张三: ¥15,000
  - 王五: ¥18,000
  - 孙八: ¥16,000

销售部:
  - 李四: ¥12,000
  - 钱七: ¥13,000

人事部:
  - 赵六: ¥10,000

各部门平均薪资:
技术部: ¥16,333
销售部: ¥12,500
人事部: ¥10,000


### 场景2：统计计数 - 文本词频分析


In [None]:
import re
from typing import Counter

# 示例文本
text = """
Python是一种高级编程语言，Python语法简洁明了。
Python广泛应用于数据科学、网站开发、自动化等领域。
学习Python可以提高编程效率，Python社区也非常活跃。
"""

print("=== 使用 defaultdict 进行词频统计 ===")

# 方法1：使用 defaultdict(int)
word_count = defaultdict(int)
words = re.findall(r'\b\w+\b', text.lower())

for word in words:
    word_count[word] += 1

print("词频统计结果（前10个）:")
# 按频率排序
sorted_words = sorted(word_count.items(), key=lambda x: x[1], reverse=True)
for word, count in sorted_words[:10]:
    print(f"{word}: {count}")

print(f"\n总词数: {sum(word_count.values())}")
print(f"唯一词数: {len(word_count)}")

# 方法2：对比 - 使用 Counter（collections 模块的另一个工具）
from collections import Counter
counter_result = Counter(words)
print(f"\n使用 Counter 的结果（前5个）: {counter_result.most_common(5)}")

# 统计字符频率
char_count = defaultdict(int)
for char in text:
    if char.isalpha():  # 只统计字母
        char_count[char.lower()] += 1

print(f"\n字符频率（前10个）:")
sorted_chars = sorted(char_count.items(), key=lambda x: x[1], reverse=True)
for char, count in sorted_chars[:10]:
    print(f"'{char}': {count}")


### 场景3：嵌套数据结构 - 多级分类系统


In [None]:
print("=== 嵌套 defaultdict 构建多级分类 ===")

# 创建嵌套的 defaultdict
# 第一级：按年份分组，第二级：按月份分组，第三级：按日期分组
def nested_dict():
    return defaultdict(list)

def monthly_dict():
    return defaultdict(nested_dict)

yearly_data = defaultdict(monthly_dict)

# 模拟日志数据
log_entries = [
    {'date': '2024-01-15', 'level': 'INFO', 'message': '用户登录成功'},
    {'date': '2024-01-15', 'level': 'ERROR', 'message': '数据库连接失败'},
    {'date': '2024-01-16', 'level': 'INFO', 'message': '系统启动'},
    {'date': '2024-02-01', 'level': 'WARNING', 'message': '内存使用率过高'},
    {'date': '2024-02-01', 'level': 'INFO', 'message': '定时任务执行'},
    {'date': '2023-12-25', 'level': 'INFO', 'message': '节日问候'},
]

# 按年-月-日分类存储日志
for entry in log_entries:
    date_parts = entry['date'].split('-')
    year, month, day = date_parts[0], date_parts[1], date_parts[2]
    
    yearly_data[year][month][day].append({
        'level': entry['level'],
        'message': entry['message']
    })

# 显示分类结果
print("按年-月-日分类的日志:")
for year, months in yearly_data.items():
    print(f"\n{year}年:")
    for month, days in months.items():
        print(f"  {month}月:")
        for day, logs in days.items():
            print(f"    {day}日 ({len(logs)}条日志):")
            for log in logs:
                print(f"      [{log['level']}] {log['message']}")

# 统计各级别日志数量
level_stats = defaultdict(int)
for year, months in yearly_data.items():
    for month, days in months.items():
        for day, logs in days.items():
            for log in logs:
                level_stats[log['level']] += 1

print(f"\n日志级别统计:")
for level, count in level_stats.items():
    print(f"{level}: {count}条")


### 场景4：API 数据聚合 - 用户行为分析


In [None]:
from datetime import datetime, timedelta
import random

print("=== API 数据聚合案例 ===")

# 模拟用户行为数据
def generate_user_actions():
    """生成模拟的用户行为数据"""
    actions = ['登录', '浏览商品', '加入购物车', '下单', '支付', '评价']
    users = [f'user_{i:03d}' for i in range(1, 21)]  # 20个用户
    
    data = []
    base_date = datetime.now() - timedelta(days=7)  # 最近7天
    
    for day in range(7):
        current_date = base_date + timedelta(days=day)
        for _ in range(random.randint(20, 50)):  # 每天20-50个行为
            data.append({
                'user_id': random.choice(users),
                'action': random.choice(actions),
                'timestamp': current_date.strftime('%Y-%m-%d'),
                'value': random.randint(1, 1000)  # 商品价值或其他指标
            })
    return data

user_actions = generate_user_actions()

# 使用 defaultdict 进行数据聚合
print("1. 按用户聚合行为:")
user_behavior = defaultdict(lambda: defaultdict(int))
user_values = defaultdict(list)

for action in user_actions:
    user_id = action['user_id']
    action_type = action['action']
    value = action['value']
    
    user_behavior[user_id][action_type] += 1
    user_values[user_id].append(value)

# 显示用户行为统计（前5个用户）
for i, (user_id, behaviors) in enumerate(user_behavior.items()):
    if i >= 5:  # 只显示前5个
        break
    print(f"\n{user_id}:")
    for action, count in behaviors.items():
        print(f"  {action}: {count}次")
    avg_value = sum(user_values[user_id]) / len(user_values[user_id])
    print(f"  平均价值: {avg_value:.2f}")

# 2. 按日期聚合
print(f"\n2. 按日期聚合活跃度:")
daily_activity = defaultdict(lambda: defaultdict(int))
daily_users = defaultdict(set)

for action in user_actions:
    date = action['timestamp']
    action_type = action['action']
    user_id = action['user_id']
    
    daily_activity[date][action_type] += 1
    daily_users[date].add(user_id)

for date, activities in sorted(daily_activity.items()):
    print(f"\n{date}:")
    print(f"  活跃用户数: {len(daily_users[date])}")
    for action, count in activities.items():
        print(f"  {action}: {count}次")

# 3. 计算转化率（从浏览到购买）
print(f"\n3. 用户转化分析:")
conversion_data = defaultdict(lambda: {'浏览商品': 0, '下单': 0, '支付': 0})

for action in user_actions:
    user_id = action['user_id']
    action_type = action['action']
    
    if action_type in conversion_data[user_id]:
        conversion_data[user_id][action_type] += 1

# 计算转化率
browse_users = 0
order_users = 0
pay_users = 0

for user_id, actions in conversion_data.items():
    if actions['浏览商品'] > 0:
        browse_users += 1
        if actions['下单'] > 0:
            order_users += 1
            if actions['支付'] > 0:
                pay_users += 1

print(f"浏览用户数: {browse_users}")
print(f"下单用户数: {order_users}")
print(f"支付用户数: {pay_users}")
if browse_users > 0:
    print(f"浏览->下单转化率: {order_users/browse_users*100:.1f}%")
if order_users > 0:
    print(f"下单->支付转化率: {pay_users/order_users*100:.1f}%")


## 4. 高级应用：生产级代码示例


In [None]:
import logging
from typing import Dict, List, Optional, DefaultDict
from dataclasses import dataclass
from enum import Enum

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

print("=== 生产级代码：缓存系统和数据聚合器 ===")

class CacheStrategy(Enum):
    """缓存策略枚举"""
    LRU = "lru"
    LFU = "lfu"
    TTL = "ttl"

@dataclass
class CacheStats:
    """缓存统计信息"""
    hits: int = 0
    misses: int = 0
    evictions: int = 0
    
    @property
    def hit_rate(self) -> float:
        total = self.hits + self.misses
        return (self.hits / total * 100) if total > 0 else 0.0

class ProductionCacheManager:
    """生产级缓存管理器，使用 defaultdict 优化数据结构"""
    
    def __init__(self, max_size: int = 1000):
        self.max_size = max_size
        
        # 使用 defaultdict 简化数据结构管理
        self.cache_data: DefaultDict[str, Dict] = defaultdict(dict)
        self.access_counts: DefaultDict[str, int] = defaultdict(int)
        self.access_times: DefaultDict[str, List[float]] = defaultdict(list)
        self.namespace_stats: DefaultDict[str, CacheStats] = defaultdict(CacheStats)
        
        logger.info(f"缓存管理器初始化完成，最大容量: {max_size}")
    
    def get(self, namespace: str, key: str) -> Optional[any]:
        """获取缓存数据"""
        if key in self.cache_data[namespace]:
            self.access_counts[namespace + ":" + key] += 1
            self.namespace_stats[namespace].hits += 1
            
            logger.debug(f"缓存命中: {namespace}:{key}")
            return self.cache_data[namespace][key]
        
        self.namespace_stats[namespace].misses += 1
        logger.debug(f"缓存未命中: {namespace}:{key}")
        return None
    
    def set(self, namespace: str, key: str, value: any) -> None:
        """设置缓存数据"""
        # 检查是否需要清理空间
        if self._total_items() >= self.max_size:
            self._evict_least_used()
        
        self.cache_data[namespace][key] = value
        self.access_counts[namespace + ":" + key] = 1
        
        logger.debug(f"缓存设置: {namespace}:{key}")
    
    def _total_items(self) -> int:
        """计算总缓存项数"""
        return sum(len(data) for data in self.cache_data.values())
    
    def _evict_least_used(self) -> None:
        """清理最少使用的缓存项"""
        if not self.access_counts:
            return
        
        # 找到使用次数最少的键
        least_used_key = min(self.access_counts.keys(), 
                           key=lambda k: self.access_counts[k])
        
        namespace, key = least_used_key.split(":", 1)
        
        # 清理数据
        del self.cache_data[namespace][key]
        del self.access_counts[least_used_key]
        
        # 如果命名空间为空，也清理掉
        if not self.cache_data[namespace]:
            del self.cache_data[namespace]
        
        self.namespace_stats[namespace].evictions += 1
        logger.info(f"清理缓存项: {namespace}:{key}")
    
    def get_stats(self) -> Dict[str, CacheStats]:
        """获取统计信息"""
        return dict(self.namespace_stats)
    
    def clear_namespace(self, namespace: str) -> None:
        """清空指定命名空间"""
        if namespace in self.cache_data:
            # 清理相关的访问计数
            keys_to_remove = [k for k in self.access_counts.keys() 
                            if k.startswith(f"{namespace}:")]
            for key in keys_to_remove:
                del self.access_counts[key]
            
            del self.cache_data[namespace]
            logger.info(f"清空命名空间: {namespace}")

# 测试缓存管理器
cache_manager = ProductionCacheManager(max_size=10)

# 模拟缓存操作
print("\\n测试缓存操作:")

# 设置不同命名空间的数据
cache_manager.set("users", "user_123", {"name": "张三", "age": 25})
cache_manager.set("users", "user_456", {"name": "李四", "age": 30})
cache_manager.set("products", "prod_001", {"name": "Python教程", "price": 99})
cache_manager.set("products", "prod_002", {"name": "数据结构", "price": 79})

# 测试缓存命中
user_data = cache_manager.get("users", "user_123")
print(f"获取用户数据: {user_data}")

product_data = cache_manager.get("products", "prod_001")
print(f"获取产品数据: {product_data}")

# 测试缓存未命中
missing_data = cache_manager.get("users", "user_999")
print(f"获取不存在的数据: {missing_data}")

# 触发缓存清理（添加超过容量的数据）
print("\\n触发缓存清理:")
for i in range(15):
    cache_manager.set("temp", f"item_{i}", f"data_{i}")

# 查看统计信息
print("\\n缓存统计信息:")
stats = cache_manager.get_stats()
for namespace, stat in stats.items():
    print(f"{namespace}: 命中率={stat.hit_rate:.1f}%, "
          f"命中={stat.hits}, 未命中={stat.misses}, 清理={stat.evictions}")

print(f"\\n当前缓存总项数: {cache_manager._total_items()}")


## 5. 性能优化和最佳实践


In [None]:
import time
import sys
from memory_profiler import memory_usage  # 需要安装: pip install memory_profiler

print("=== 性能对比：defaultdict vs 普通字典 ===")

def benchmark_normal_dict(n: int) -> float:
    """测试普通字典的性能"""
    start_time = time.time()
    data = {}
    
    for i in range(n):
        key = f"group_{i % 100}"
        if key not in data:
            data[key] = []
        data[key].append(i)
    
    return time.time() - start_time

def benchmark_defaultdict(n: int) -> float:
    """测试 defaultdict 的性能"""
    start_time = time.time()
    data = defaultdict(list)
    
    for i in range(n):
        key = f"group_{i % 100}"
        data[key].append(i)
    
    return time.time() - start_time

def benchmark_setdefault(n: int) -> float:
    """测试使用 setdefault 的性能"""
    start_time = time.time()
    data = {}
    
    for i in range(n):
        key = f"group_{i % 100}"
        data.setdefault(key, []).append(i)
    
    return time.time() - start_time

# 性能测试
test_sizes = [1000, 10000, 100000]

print("操作数\\t\\t普通字典\\tdefaultdict\\tsetdefault")
print("-" * 50)

for size in test_sizes:
    normal_time = benchmark_normal_dict(size)
    default_time = benchmark_defaultdict(size)
    setdefault_time = benchmark_setdefault(size)
    
    print(f"{size:,}\\t\\t{normal_time:.4f}s\\t{default_time:.4f}s\\t{setdefault_time:.4f}s")

# 内存使用对比
print(f"\\n=== 内存使用对比 ===")

def memory_test_normal_dict():
    data = {}
    for i in range(10000):
        key = f"group_{i % 100}"
        if key not in data:
            data[key] = []
        data[key].append(i)
    return data

def memory_test_defaultdict():
    data = defaultdict(list)
    for i in range(10000):
        key = f"group_{i % 100}"
        data[key].append(i)
    return data

try:
    # 测试内存使用（如果安装了 memory_profiler）
    normal_memory = memory_usage(memory_test_normal_dict)
    default_memory = memory_usage(memory_test_defaultdict)
    
    print(f"普通字典最大内存使用: {max(normal_memory):.2f} MB")
    print(f"defaultdict最大内存使用: {max(default_memory):.2f} MB")
except ImportError:
    print("未安装 memory_profiler，跳过内存测试")
    print("安装命令: pip install memory_profiler")

# 最佳实践示例
print(f"\\n=== defaultdict 最佳实践 ===")

class BestPracticesDemo:
    """展示 defaultdict 最佳实践"""
    
    @staticmethod
    def practice_1_avoid_lambda_when_possible():
        """实践1：避免不必要的 lambda"""
        print("❌ 不推荐：")
        bad_dict = defaultdict(lambda: [])  # 不必要的 lambda
        print(f"defaultdict(lambda: []) -> {bad_dict['key']}")
        
        print("✅ 推荐：")
        good_dict = defaultdict(list)  # 直接使用类型
        print(f"defaultdict(list) -> {good_dict['key']}")
    
    @staticmethod
    def practice_2_type_annotations():
        """实践2：使用类型注解"""
        print("\\n✅ 使用类型注解提高代码可读性：")
        
        # 明确的类型注解
        user_groups: DefaultDict[str, List[str]] = defaultdict(list)
        user_counts: DefaultDict[str, int] = defaultdict(int)
        
        print("代码更清晰，IDE 支持更好")
    
    @staticmethod
    def practice_3_convert_when_needed():
        """实践3：必要时转换为普通字典"""
        print("\\n✅ API 返回时转换为普通字典：")
        
        data = defaultdict(list)
        data['users'].append('张三')
        data['users'].append('李四')
        
        # 转换为普通字典进行 JSON 序列化
        api_response = dict(data)
        print(f"API 响应: {api_response}")
        print(f"可以安全序列化为 JSON: {json.dumps(api_response, ensure_ascii=False)}")
    
    @staticmethod
    def practice_4_careful_with_nesting():
        """实践4：谨慎处理嵌套结构"""
        print("\\n✅ 嵌套 defaultdict 的正确方式：")
        
        # 方式1：使用 lambda
        nested1 = defaultdict(lambda: defaultdict(int))
        nested1['user']['login_count'] += 1
        print(f"方式1结果: {dict(nested1)}")
        
        # 方式2：定义辅助函数（更清晰）
        def create_user_stats():
            return defaultdict(int)
        
        nested2 = defaultdict(create_user_stats)
        nested2['user']['login_count'] += 1
        print(f"方式2结果: {dict(nested2)}")

# 运行最佳实践演示
demo = BestPracticesDemo()
demo.practice_1_avoid_lambda_when_possible()
demo.practice_2_type_annotations()
demo.practice_3_convert_when_needed()
demo.practice_4_careful_with_nesting()


## 6. 常见陷阱和注意事项


In [None]:
print("=== 常见陷阱和解决方案 ===")

# 陷阱1：意外创建键
print("⚠️  陷阱1：仅仅访问就会创建键")
data = defaultdict(list)
print(f"访问前的键: {list(data.keys())}")

# 仅仅检查会创建键！
if data['nonexistent_key']:  # 这会创建键！
    print("不会执行")

print(f"访问后的键: {list(data.keys())}")
print(f"值: {data['nonexistent_key']}")

print("\\n✅ 解决方案：使用 in 操作符或 get 方法")
safe_data = defaultdict(list)
print(f"安全检查前的键: {list(safe_data.keys())}")

# 安全的检查方式
if 'nonexistent_key' in safe_data:
    print("不会执行")

# 或者使用 get 方法
value = safe_data.get('nonexistent_key')
print(f"安全检查后的键: {list(safe_data.keys())}")
print(f"get 返回值: {value}")

# 陷阱2：序列化问题
print("\\n⚠️  陷阱2：JSON 序列化问题")
try:
    json_data = defaultdict(list)
    json_data['items'].append('test')
    
    # 这会失败！
    json_str = json.dumps(json_data)
except TypeError as e:
    print(f"序列化失败: {e}")

print("✅ 解决方案：转换为普通字典")
json_data = defaultdict(list)
json_data['items'].append('test')
json_str = json.dumps(dict(json_data))
print(f"序列化成功: {json_str}")

# 陷阱3：工厂函数的副作用
print("\\n⚠️  陷阱3：工厂函数有副作用")

class Counter:
    def __init__(self):
        self.count = 0
    
    def __call__(self):
        self.count += 1
        print(f"工厂函数被调用第 {self.count} 次")
        return []

counter = Counter()
problematic_dict = defaultdict(counter)

print("访问键会触发工厂函数:")
_ = problematic_dict['key1']
_ = problematic_dict['key2']
_ = problematic_dict['key3']

print("\\n✅ 解决方案：使用无副作用的工厂函数")
clean_dict = defaultdict(list)  # 无副作用

# 陷阱4：嵌套 defaultdict 的混乱
print("\\n⚠️  陷阱4：嵌套结构可能导致混乱")

# 问题代码
messy_nested = defaultdict(lambda: defaultdict(lambda: defaultdict(int)))
messy_nested['a']['b']['c'] = 1

print("嵌套太深，难以理解:")
print(messy_nested['a']['b']['c'])

print("\\n✅ 解决方案：使用清晰的结构")

class UserStats:
    def __init__(self):
        self.login_count = 0
        self.page_views = 0
        self.actions = []
    
    def to_dict(self):
        return {
            'login_count': self.login_count,
            'page_views': self.page_views,
            'actions': self.actions
        }

clean_nested = defaultdict(UserStats)
clean_nested['user1'].login_count += 1
clean_nested['user1'].actions.append('login')

print("清晰的结构:")
print(clean_nested['user1'].to_dict())

# 陷阱5：内存泄漏风险
print("\\n⚠️  陷阱5：意外的内存使用")

def memory_leak_example():
    # 如果键是动态生成的，可能导致内存泄漏
    leak_dict = defaultdict(list)
    
    # 模拟随机键（实际使用中可能来自用户输入）
    import random
    for i in range(1000):
        random_key = f"user_{random.randint(1, 1000000)}"
        leak_dict[random_key].append(f"data_{i}")
    
    return len(leak_dict)

leaked_keys = memory_leak_example()
print(f"创建了 {leaked_keys} 个键，可能导致内存问题")

print("\\n✅ 解决方案：限制键的数量或定期清理")

class BoundedDefaultDict(defaultdict):
    def __init__(self, default_factory, max_size=1000):
        super().__init__(default_factory)
        self.max_size = max_size
    
    def __setitem__(self, key, value):
        if len(self) >= self.max_size and key not in self:
            # 移除最老的键（简单实现）
            oldest_key = next(iter(self))
            del self[oldest_key]
            print(f"移除最老的键: {oldest_key}")
        
        super().__setitem__(key, value)
    
    def __getitem__(self, key):
        if len(self) >= self.max_size and key not in self:
            oldest_key = next(iter(self))
            del self[oldest_key]
            print(f"移除最老的键: {oldest_key}")
        
        return super().__getitem__(key)

bounded_dict = BoundedDefaultDict(list, max_size=3)
for i in range(5):
    bounded_dict[f'key_{i}'].append(f'value_{i}')

print(f"最终键数量: {len(bounded_dict)}")
print(f"剩余的键: {list(bounded_dict.keys())}")


## 7. 总结与学习建议

### 🎯 核心要点回顾

1. **基本原理**：`defaultdict` 通过工厂函数自动创建缺失键的默认值，避免 `KeyError`
2. **性能优势**：比传统的键检查方式更高效，特别是在大量数据操作时
3. **常用场景**：分组、计数、嵌套结构、数据聚合
4. **类型安全**：配合类型注解使用，提高代码质量

### 📚 学习路线建议

**初学者**：
- 掌握基本语法和常用工厂函数（`list`, `int`, `set`, `dict`）
- 练习简单的分组和计数任务
- 理解与普通字典的区别

**中级开发者**：
- 学会设计嵌套数据结构
- 掌握性能优化技巧
- 了解常见陷阱和解决方案

**高级开发者**：
- 结合其他 collections 模块工具使用
- 设计生产级的数据处理系统
- 自定义 defaultdict 子类

### 🔧 实战练习建议

1. **日志分析器**：按时间、级别、模块分组分析日志
2. **用户行为追踪**：分析用户路径和转化率
3. **数据报表生成器**：多维度数据聚合和统计
4. **缓存系统**：实现带有统计功能的缓存管理器

### 🚀 进阶学习

- **collections.Counter**：专门用于计数的工具
- **collections.ChainMap**：多字典链式查找
- **collections.OrderedDict**：保序字典
- **functools.lru_cache**：与缓存相关的装饰器


In [None]:
# 最终练习：综合应用示例
print("=== 🎯 综合练习：网站访问统计分析器 ===")

class WebsiteAnalyzer:
    """网站访问统计分析器 - 综合应用 defaultdict"""
    
    def __init__(self):
        # 使用不同类型的 defaultdict 来存储统计数据
        self.page_views: DefaultDict[str, int] = defaultdict(int)
        self.user_sessions: DefaultDict[str, List[Dict]] = defaultdict(list)
        self.hourly_traffic: DefaultDict[int, int] = defaultdict(int)
        self.referrer_stats: DefaultDict[str, int] = defaultdict(int)
        self.device_types: DefaultDict[str, Set[str]] = defaultdict(set)
        
        # 用于计算平均值的数据
        self.response_times: DefaultDict[str, List[float]] = defaultdict(list)
        
        logger.info("网站分析器初始化完成")
    
    def log_visit(self, user_id: str, page: str, timestamp: str, 
                  referrer: str = "direct", device: str = "desktop",
                  response_time: float = 0.1):
        """记录一次访问"""
        
        # 提取小时信息
        hour = int(timestamp.split(':')[0]) if ':' in timestamp else 12
        
        # 更新各项统计
        self.page_views[page] += 1
        self.user_sessions[user_id].append({
            'page': page,
            'timestamp': timestamp,
            'referrer': referrer,
            'device': device,
            'response_time': response_time
        })
        self.hourly_traffic[hour] += 1
        self.referrer_stats[referrer] += 1
        self.device_types[device].add(user_id)
        self.response_times[page].append(response_time)
        
        logger.debug(f"记录访问: {user_id} -> {page}")
    
    def get_popular_pages(self, top_n: int = 5) -> List[tuple]:
        """获取最受欢迎的页面"""
        return sorted(self.page_views.items(), 
                     key=lambda x: x[1], reverse=True)[:top_n]
    
    def get_user_behavior(self, user_id: str) -> Dict:
        """分析用户行为"""
        sessions = self.user_sessions[user_id]
        if not sessions:
            return {'error': 'User not found'}
        
        pages_visited = [session['page'] for session in sessions]
        unique_pages = len(set(pages_visited))
        
        return {
            'total_visits': len(sessions),
            'unique_pages': unique_pages,
            'pages_visited': pages_visited,
            'devices_used': list(set(s['device'] for s in sessions)),
            'referrer_sources': list(set(s['referrer'] for s in sessions))
        }
    
    def get_performance_stats(self) -> Dict:
        """获取性能统计"""
        performance = {}
        for page, times in self.response_times.items():
            if times:
                performance[page] = {
                    'avg_response_time': sum(times) / len(times),
                    'min_response_time': min(times),
                    'max_response_time': max(times),
                    'total_requests': len(times)
                }
        return performance
    
    def generate_report(self) -> str:
        """生成完整的分析报告"""
        report = []
        report.append("📊 网站访问统计报告")
        report.append("=" * 30)
        
        # 热门页面
        popular_pages = self.get_popular_pages()
        report.append(f"\\n🔥 热门页面 (前5名):")
        for i, (page, views) in enumerate(popular_pages, 1):
            report.append(f"{i}. {page}: {views} 次访问")
        
        # 流量分布
        report.append(f"\\n⏰ 24小时流量分布:")
        for hour in sorted(self.hourly_traffic.keys()):
            count = self.hourly_traffic[hour]
            bar = "█" * (count // 5)  # 简单的柱状图
            report.append(f"{hour:02d}:00 - {count:3d} 访问 {bar}")
        
        # 来源统计
        report.append(f"\\n🔗 流量来源:")
        total_visits = sum(self.referrer_stats.values())
        for referrer, count in sorted(self.referrer_stats.items(), 
                                    key=lambda x: x[1], reverse=True):
            percentage = (count / total_visits * 100) if total_visits > 0 else 0
            report.append(f"{referrer}: {count} ({percentage:.1f}%)")
        
        # 设备统计
        report.append(f"\\n📱 设备类型统计:")
        for device, users in self.device_types.items():
            report.append(f"{device}: {len(users)} 用户")
        
        return "\\n".join(report)

# 创建分析器并模拟数据
analyzer = WebsiteAnalyzer()

# 模拟访问数据
sample_visits = [
    ("user_001", "/home", "09:30", "google.com", "mobile", 0.12),
    ("user_001", "/products", "09:32", "direct", "mobile", 0.15),
    ("user_002", "/home", "10:15", "facebook.com", "desktop", 0.08),
    ("user_002", "/about", "10:20", "direct", "desktop", 0.10),
    ("user_003", "/products", "11:45", "google.com", "tablet", 0.20),
    ("user_001", "/contact", "14:30", "direct", "mobile", 0.18),
    ("user_004", "/home", "15:20", "twitter.com", "desktop", 0.09),
    ("user_004", "/products", "15:25", "direct", "desktop", 0.14),
    ("user_005", "/home", "16:45", "direct", "mobile", 0.11),
    ("user_003", "/home", "18:30", "google.com", "tablet", 0.13),
]

# 记录所有访问
for visit in sample_visits:
    analyzer.log_visit(*visit)

# 生成报告
report = analyzer.generate_report()
print(report)

# 分析特定用户
print(f"\\n👤 用户 user_001 行为分析:")
user_behavior = analyzer.get_user_behavior("user_001")
for key, value in user_behavior.items():
    print(f"{key}: {value}")

# 性能统计
print(f"\\n⚡ 页面性能统计:")
perf_stats = analyzer.get_performance_stats()
for page, stats in perf_stats.items():
    print(f"{page}:")
    print(f"  平均响应时间: {stats['avg_response_time']:.3f}s")
    print(f"  请求次数: {stats['total_requests']}")

print("\\n🎉 defaultdict 学习完成！现在你已经掌握了这个强大的数据结构。")
