# 使用示例 - 用户订单数据处理

这是一个通用的批处理示例，展示如何使用BatchProcessor框架处理用户订单数据

## 1. 导入依赖

In [None]:
from batch_processor import BatchProcessor
import pandas as pd
import requests
import json
from typing import Dict, Any
from cachetools import cached, LRUCache
from collections import defaultdict

# 创建缓存
cache = LRUCache(maxsize=1000)
api_cache = cached(cache)

## 2. 实现用户订单处理器

In [None]:
class UserOrderProcessor(BatchProcessor):
    
    def get_data_source(self):
        """数据源为CSV文件"""
        return 'orders.csv'
    
    def define_schema(self) -> Dict[str, list]:
        """定义表结构"""
        return {
            'control_fields': [
                'is_processed',
                'retry_count'
            ],
            'result_fields': [
                'order_status',      # 订单状态
                'total_amount',      # 订单总金额
                'user_level'         # 用户等级
            ]
        }
    
    @api_cache
    def _fetch_order_info_cached(self, order_ids_tuple):
        """获取订单详情 - 带缓存的内部方法"""
        # 模拟API调用
        api_url = "https://api.example.com/v1/orders/batch"
        
        order_ids = list(order_ids_tuple)
        payload = json.dumps({"order_ids": order_ids})
        headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer your-api-token',
            'Accept': '*/*'
        }
        
        # 模拟API响应
        # response = requests.post(api_url, headers=headers, data=payload, timeout=30)
        # response.raise_for_status()
        
        # 模拟返回数据
        mock_data = {}
        for order_id in order_ids:
            mock_data[order_id] = {
                'order_id': order_id,
                'status': 'completed',
                'amount': 199.99,
                'items': [{'name': 'Product A', 'price': 199.99}]
            }
        
        return mock_data
    
    def fetch_order_info(self, order_ids):
        """获取订单详情"""
        return self._fetch_order_info_cached(tuple(order_ids))
    
    @api_cache
    def _fetch_user_info_cached(self, user_ids_tuple):
        """获取用户信息 - 带缓存的内部方法"""
        user_ids = list(user_ids_tuple)
        api_url = "https://api.example.com/v1/users/batch"
        
        headers = {
            'Content-Type': 'application/json',
            'Authorization': 'Bearer your-api-token',
            'Accept': '*/*'
        }
        
        result = []
        
        # 批量处理，每批最多50个ID
        for i in range(0, len(user_ids), 50):
            batch_user_ids = user_ids[i:i+50]
            
            payload = json.dumps({
                "user_ids": batch_user_ids,
                "fields": ["id", "level", "email", "created_at"]
            })
            
            # 模拟API调用
            # response = requests.post(api_url, headers=headers, data=payload, timeout=30)
            # response.raise_for_status()
            
            # 模拟返回数据
            for user_id in batch_user_ids:
                result.append({
                    'id': user_id,
                    'level': 'gold',
                    'email': f'user{user_id}@example.com',
                    'created_at': '2023-01-01'
                })
        
        return result
    
    def fetch_user_info(self, user_ids):
        """获取用户信息"""
        return self._fetch_user_info_cached(tuple(user_ids))
    
    def process_business_logic(self, batch_data: pd.DataFrame) -> pd.DataFrame:
        """处理用户订单业务逻辑"""
        try:
            # 1. 获取订单详情数据
            order_info = self.fetch_order_info(batch_data['order_id'].unique().tolist())
            
            # 2. 提取所有相关的用户ID
            all_user_ids = set()
            order_id_2_user_id = {}
            
            for order_id, order_detail in order_info.items():
                # 假设从订单数据中可以获取用户ID
                user_id = f"user_{order_id[-4:]}"  # 模拟用户ID
                order_id_2_user_id[order_id] = user_id
                all_user_ids.add(user_id)
            
            # 3. 获取用户详细信息
            user_data = self.fetch_user_info(list(all_user_ids))
            user_id_2_user_info = {
                item['id']: item for item in user_data
            }
            
            # 4. 处理每一行数据
            for idx, row in batch_data.iterrows():
                order_id = row['order_id']
                customer_email = row['customer_email']
                
                # 获取订单详情
                order_detail = order_info.get(order_id, {})
                order_status = order_detail.get('status', 'unknown')
                total_amount = order_detail.get('amount', 0)
                
                # 获取用户信息
                user_id = order_id_2_user_id.get(order_id, '')
                user_info = user_id_2_user_info.get(user_id, {})
                user_level = user_info.get('level', 'bronze')
                
                # 设置结果
                batch_data.loc[idx, 'order_status'] = order_status
                batch_data.loc[idx, 'total_amount'] = total_amount
                batch_data.loc[idx, 'user_level'] = user_level
                
                print(f"订单: {order_id}, 状态: {order_status}, 金额: {total_amount}, 用户等级: {user_level}")
            
            return batch_data
            
        except Exception as e:
            self.logger.error(f"处理业务逻辑异常: {str(e)}")
            # 返回原始数据，结果字段设置为默认值
            for idx in batch_data.index:
                batch_data.loc[idx, 'order_status'] = 'error'
                batch_data.loc[idx, 'total_amount'] = 0
                batch_data.loc[idx, 'user_level'] = 'unknown'
            return batch_data

## 3. 执行处理

### 调试模式 - 先测试少量数据

### 生产模式 - 处理全部数据

In [None]:
# 确认调试结果正确后，处理全部数据
# print("开始生产模式处理...")
# processor = UserOrderProcessor(
#     batch_size=100,
#     table_name='order_table',
#     db_name='orders.db',
#     max_retries=3
# )
# processor.run()  # 不传参数，处理全部数据

In [None]:
# 创建处理器实例
processor = UserOrderProcessor(
    batch_size=100,
    table_name='order_table',
    db_name='orders.db',
    max_retries=3
)

# 🔧 调试模式: 只处理2个批次，用于测试业务逻辑
print("开始调试模式处理...")
processor.run(debug_batch_times=2)

## 4. 查看处理结果

In [None]:
# 获取处理统计
stats = processor.get_statistics()
print("处理统计:")
print(f"总记录数: {stats['total']}")
print(f"已处理: {stats['processed']}")
print(f"待处理: {stats['pending']}")
print(f"处理失败: {stats['failed']}")

# 计算处理成功率
import sqlite3
conn = sqlite3.connect('orders.db')
success_stats = pd.read_sql("""
    SELECT 
        COUNT(*) as total_processed,
        SUM(CASE WHEN order_status != 'error' THEN 1 ELSE 0 END) as success,
        ROUND(SUM(CASE WHEN order_status != 'error' THEN 1 ELSE 0 END) * 100.0 / COUNT(*), 2) as success_rate
    FROM order_table 
    WHERE is_processed = 1
""", conn)
conn.close()

print(f"\n处理结果统计:")
print(f"成功处理: {success_stats.iloc[0]['success']}")
print(f"成功率: {success_stats.iloc[0]['success_rate']}%")

In [None]:
# 查看不同状态的订单样本
conn = sqlite3.connect('orders.db')

print("已完成订单样本:")
completed_sample = pd.read_sql("""
    SELECT order_id, customer_email, order_status, total_amount, user_level
    FROM order_table 
    WHERE is_processed = 1 AND order_status = 'completed' 
    LIMIT 5
""", conn)
display(completed_sample)

print("\n用户等级分布:")
level_distribution = pd.read_sql("""
    SELECT user_level, COUNT(*) as count
    FROM order_table 
    WHERE is_processed = 1
    GROUP BY user_level
    ORDER BY count DESC
""", conn)
display(level_distribution)

conn.close()

## 5. 导出结果

In [None]:
# 导出所有处理结果
processor.export_results('order_results.csv', only_processed=True)

# 也可以导出特定状态的订单
conn = sqlite3.connect('orders.db')

# 导出已完成的订单
completed_df = pd.read_sql("""
    SELECT * FROM order_table 
    WHERE is_processed = 1 AND order_status = 'completed'
""", conn)
completed_df.to_csv('completed_orders.csv', index=False, sep='\t')

# 导出高级用户的订单
premium_df = pd.read_sql("""
    SELECT * FROM order_table 
    WHERE is_processed = 1 AND user_level IN ('gold', 'platinum')
""", conn)
premium_df.to_csv('premium_user_orders.csv', index=False, sep='\t')

conn.close()

print(f"结果导出完成:")
print(f"- 全部结果: order_results.csv")
print(f"- 已完成订单: completed_orders.csv ({len(completed_df)}条)")
print(f"- 高级用户订单: premium_user_orders.csv ({len(premium_df)}条)")