In [None]:
import redis
import pickle
import time
import pandas as pd
import os
import pyarrow as pa
import pyarrow.parquet as pq
from io import StringIO
from datetime import datetime
from typing import Dict, Optional, List, Tuple

class MinuteDataProcessor:
    def __init__(self, config_path: str = "redis.conf"):
        """初始化处理器，匹配正式数据分区结构"""
        self.redis_config = self._load_redis_config(config_path)
        self.redis = redis.Redis(
            host=self.redis_config["host"],
            port=self.redis_config["port"],
            password=self.redis_config["password"],
            decode_responses=False
        )
        self.result_queue = "function_results"
        self.task_metadata = "task_metadata"
        # 正式数据分区根目录
        self.formal_data_root = r"D:\workspace\xiaoyao\data\stock_minutely_price"
        # 临时CSV目录（当前目录下）
        self.temp_download_dir = os.path.join(os.getcwd(), "temp_minute_downloads")  
        self.idle_timeout = 1800  # 30分钟无任务退出
        self._test_redis_connection()
        self._init_storage()

    def _load_redis_config(self, config_path: str) -> Dict[str, str]:
        """加载Redis配置"""
        config = {"host": "localhost", "port": 6379, "password": ""}
        try:
            with open(config_path, "r", encoding="utf-8") as f:
                for line in f:
                    line = line.strip()
                    if line.startswith("host="):
                        config["host"] = line.split("=", 1)[1].strip()
                    elif line.startswith("port="):
                        config["port"] = int(line.split("=", 1)[1].strip())
                    elif line.startswith("password="):
                        config["password"] = line.split("=", 1)[1].strip()
            return config
        except Exception as e:
            print(f"⚠️ 配置文件读取失败，使用默认配置: {e}")
            return config

    def _test_redis_connection(self):
        """测试Redis连接"""
        try:
            self.redis.ping()
            print(f"✅ 处理器Redis连接成功 | {self.redis_config['host']}:{self.redis_config['port']}")
        except Exception as e:
            print(f"❌ 处理器Redis连接失败: {e}")
            raise SystemExit(1)

    def _init_storage(self):
        """初始化正式目录和临时目录"""
        os.makedirs(self.formal_data_root, exist_ok=True)
        os.makedirs(self.temp_download_dir, exist_ok=True)
        print(f"✅ 正式数据分区根目录: {self.formal_data_root}")
        print(f"✅ 临时CSV目录: {self.temp_download_dir}")

    # 第一阶段：下载CSV并保存到临时目录
    def _stage1_download_to_temp(self, csv_str: str, task_id: str) -> Tuple[Optional[pd.DataFrame], Optional[str]]:
        """返回DataFrame和临时文件路径，便于后续删除"""
        if not csv_str.strip():
            print(f"⚠️ 任务{task_id}返回空数据，跳过")
            return None, None

        try:
            # 读取CSV数据
            df = pd.read_csv(StringIO(csv_str))
            # 校验核心字段（匹配正式数据结构）
            required_cols = ['date', 'stock_code', 'time', 'open', 'close', 'high', 'low', 'volume']
            missing_cols = [col for col in required_cols if col not in df.columns]
            if missing_cols:
                raise ValueError(f"缺少核心字段: {missing_cols}")

            # 数据类型统一（空值保留，不填充0）
            numeric_cols = ['open', 'close', 'high', 'low', 'volume']
            for col in numeric_cols:
                df[col] = pd.to_numeric(df[col], errors='coerce')  # 空值保留为NaN
            # 统一时间格式
            df['time'] = pd.to_datetime(df['time']).dt.strftime('%Y-%m-%d %H:%M:%S')

            # 保存到临时目录
            temp_file_path = os.path.join(self.temp_download_dir, f"{task_id}_raw.csv")
            df.to_csv(temp_file_path, index=False, encoding='utf-8')
            print(f"📥 任务{task_id}临时CSV已保存: {temp_file_path}")
            return df, temp_file_path
        except Exception as e:
            print(f"❌ 任务{task_id}CSV处理失败: {str(e)[:100]}")
            return None, None

    # 第二阶段：验证临时数据质量（彻底修复null_cols引用错误）
    def _stage2_verify_temp_data(self, df: pd.DataFrame, task_id: str) -> bool:
        """验证数据完整性，修复null_cols定义逻辑"""
        try:
            print(f"\n📊 任务{task_id}数据校验:")
            print(f"总记录数: {len(df)} | 涉及股票数: {df['stock_code'].nunique()}")
            
            # 1. 检查单股票单日记录数
            stock_records = df.groupby('stock_code').size()
            abnormal_stocks = stock_records[stock_records != 240].index.tolist()
            if abnormal_stocks:
                print(f"⚠️ 异常股票（记录数≠240）: {abnormal_stocks[:5]}（共{len(abnormal_stocks)}只）")
            else:
                print("✅ 所有股票记录数正常（单股票单日240条）")
            
            # 2. 检查空值（核心修复：基于null_summary计算null_cols，而非引用自身）
            null_summary = df.isnull().sum()  # 先计算每列空值数
            # 筛选空值数>0的列，生成null_cols（修复前误写为null_summary[null_cols > 0]）
            null_cols = null_summary[null_summary > 0].index.tolist()  
            if null_cols:
                # 格式化空值统计（转为整数，避免科学计数法）
                null_stats = {col: int(null_summary[col]) for col in null_cols}
                print(f"ℹ️ 存在空值的字段: {null_cols} | 空值统计: {null_stats}")
            else:
                print("✅ 无空值数据")
            
            return True
        except Exception as e:
            print(f"❌ 任务{task_id}数据校验失败: {str(e)[:100]}")
            return False

    # 第三阶段：合并到正式分区Parquet + 删除临时CSV
    def _stage3_merge_and_clean(self, df: pd.DataFrame, task_id: str, temp_file_path: str) -> bool:
        """合并数据到正式Parquet，完成后删除临时CSV"""
        try:
            # 按股票代码分组，逐个合并
            for stock_code in df['stock_code'].unique():
                stock_df = df[df['stock_code'] == stock_code].copy()
                # 构建正式分区路径（匹配 "stock_code=XXX.XSHE" 结构）
                stock_dir = os.path.join(self.formal_data_root, f"stock_code={stock_code}")
                formal_parquet_path = os.path.join(stock_dir, "data.parquet")

                # 创建股票目录（不存在则新建）
                os.makedirs(stock_dir, exist_ok=True)
                print(f"\n🔄 处理股票: {stock_code} | 正式文件: {formal_parquet_path}")

                # 合并数据（保留空值）
                if os.path.exists(formal_parquet_path):
                    # 读取已有数据并去重
                    existing_df = pd.read_parquet(formal_parquet_path)
                    combined_df = pd.concat([existing_df, stock_df], ignore_index=True)
                    # 按唯一键去重，避免重复记录
                    combined_df = combined_df.drop_duplicates(subset=['date', 'time', 'stock_code'])
                    # 写入合并后的数据
                    table = pa.Table.from_pandas(combined_df)
                    pq.write_table(table, formal_parquet_path, compression="snappy")
                    print(f"✅ 已追加数据 | 原记录数: {len(existing_df)} | 新记录数: {len(stock_df)} | 合并后: {len(combined_df)}")
                else:
                    # 新建Parquet文件
                    table = pa.Table.from_pandas(stock_df)
                    pq.write_table(table, formal_parquet_path, compression="snappy")
                    print(f"✅ 新建Parquet文件 | 初始记录数: {len(stock_df)}")

            # 合并完成，删除临时CSV
            if os.path.exists(temp_file_path):
                os.remove(temp_file_path)
                print(f"\n🗑️ 任务{task_id}临时CSV已删除: {temp_file_path}")
            else:
                print(f"\n⚠️ 任务{task_id}临时CSV不存在，无需删除")

            print(f"🎉 任务{task_id}所有股票已合并到正式分区")
            return True
        except Exception as e:
            print(f"❌ 任务{task_id}合并/清理失败: {str(e)[:100]}")
            # 合并失败时保留临时文件，便于排查
            print(f"⚠️ 合并失败，临时CSV已保留: {temp_file_path}")
            return False

    # 整合完整处理流程
    def _process_full_flow(self, csv_str: str, task_id: str) -> bool:
        # 阶段1：下载到临时目录
        temp_df, temp_file = self._stage1_download_to_temp(csv_str, task_id)
        if temp_df is None or temp_file is None:
            return False
        # 阶段2：数据质量验证（已修复null_cols错误）
        if not self._stage2_verify_temp_data(temp_df, task_id):
            return False
        # 阶段3：合并到正式Parquet + 清理临时文件
        if not self._stage3_merge_and_clean(temp_df, task_id, temp_file):
            return False
        return True

    # 监听队列并执行处理（优化blpop解包逻辑）
    def listen_and_process(self):
        print(f"✅ 开始监听结果队列（{self.idle_timeout}秒无任务退出）")
        stats = {"success": 0, "failed": 0, "last_active": time.time()}

        while True:
            # 超时退出逻辑
            if time.time() - stats["last_active"] > self.idle_timeout:
                print("\n⏰ 长时间无新任务，退出处理器")
                # 清理残留临时文件
                self._clean_residual_temp_files()
                break

            try:
                # 优化：先接收blpop结果，避免直接解包None
                queue_result = self.redis.blpop(self.result_queue, timeout=30)
                if queue_result is None:
                    continue  # 无数据，继续等待
                _, result_bytes = queue_result  # 有数据时再解包

                # 更新活动时间
                stats["last_active"] = time.time()
                # 反序列化结果
                result = pickle.loads(result_bytes)
                task_id = result.get("task_id", "未知任务")

                # 处理成功的任务
                if result.get("status") == "success":
                    csv_data = result.get("result", "")
                    if self._process_full_flow(csv_data, task_id):
                        stats["success"] += 1
                        # 清理Redis任务元信息
                        self.redis.hdel(self.task_metadata, task_id)
                        print(f"\n🏆 任务{task_id[:8]}...处理完成 | 累计成功: {stats['success']}")
                    else:
                        stats["failed"] += 1
                        print(f"❌ 任务{task_id[:8]}...处理失败 | 累计失败: {stats['failed']}")
                else:
                    # 处理远程执行失败的任务
                    stats["failed"] += 1
                    error_msg = result.get("error", "无错误信息")
                    print(f"❌ 任务{task_id[:8]}...远程失败: {error_msg} | 累计失败: {stats['failed']}")

            except Exception as e:
                print(f"⚠️ 处理器异常: {str(e)[:80]}，等待10秒重试")
                time.sleep(10)

        # 输出最终统计
        print("\n" + "="*50)
        print("处理结果汇总")
        print(f"总任务数: {stats['success'] + stats['failed']} | 成功: {stats['success']} | 失败: {stats['failed']}")
        if stats["success"] + stats["failed"] > 0:
            print(f"成功率: {stats['success']/(stats['success']+stats['failed'])*100:.1f}%")
        print("="*50)

    def _clean_residual_temp_files(self):
        """清理临时目录中残留的CSV文件"""
        residual_files = [f for f in os.listdir(self.temp_download_dir) if f.endswith("_raw.csv")]
        if not residual_files:
            print(f"✅ 临时目录无残留文件: {self.temp_download_dir}")
            return

        print(f"\n🗑️ 开始清理临时目录残留文件（共{len(residual_files)}个）")
        for file_name in residual_files:
            file_path = os.path.join(self.temp_download_dir, file_name)
            try:
                os.remove(file_path)
                print(f"✅ 删除残留文件: {file_name}")
            except Exception as e:
                print(f"❌ 删除残留文件{file_name}失败: {str(e)[:50]}")
        print(f"✅ 临时目录清理完成: {self.temp_download_dir}")

if __name__ == "__main__":
    try:
        # 初始化并启动处理器
        processor = MinuteDataProcessor(config_path="redis.conf")
        processor.listen_and_process()
    except Exception as e:
        print(f"❌ 处理器启动失败: {e}")

In [None]:
# import redis
# import pickle
# import time
# import pandas as pd
# import os
# import pyarrow as pa
# import pyarrow.parquet as pq
# from io import StringIO
# from datetime import datetime
# from typing import Dict, Optional

# class MinuteDataProcessor:
#     def __init__(self, config_path: str = "redis.conf"):
#         """初始化处理器，与发布器保持相同的Redis配置逻辑"""
#         self.redis_config = self._load_redis_config(config_path)
#         self.redis = redis.Redis(
#             host=self.redis_config["host"],
#             port=self.redis_config["port"],
#             password=self.redis_config["password"],
#             decode_responses=False
#         )
#         self.result_queue = "function_results"  # 与发布器对应
#         self.task_metadata = "task_metadata"     # 与发布器存储元信息的键一致
#         self.storage_root = r"D:\workspace\xiaoyao\data\stock_minutely_price"
#         self.idle_timeout = 1800  # 30分钟无任务退出
#         self._test_redis_connection()
#         self._init_storage()

#     def _load_redis_config(self, config_path: str) -> Dict[str, str]:
#         """复用发布器的Redis配置加载逻辑，确保一致"""
#         config = {"host": "localhost", "port": 6379, "password": ""}
#         try:
#             with open(config_path, "r", encoding="utf-8") as f:
#                 for line in f:
#                     line = line.strip()
#                     if line.startswith("host="):
#                         config["host"] = line.split("=", 1)[1].strip()
#                     elif line.startswith("port="):
#                         config["port"] = int(line.split("=", 1)[1].strip())
#                     elif line.startswith("password="):
#                         config["password"] = line.split("=", 1)[1].strip()
#             return config
#         except Exception as e:
#             print(f"⚠️ 配置文件读取失败，使用默认配置: {e}")
#             return config

#     def _test_redis_connection(self):
#         """测试Redis连接，与发布器逻辑一致"""
#         try:
#             self.redis.ping()
#             print(f"✅ 处理器Redis连接成功 | {self.redis_config['host']}:{self.redis_config['port']}")
#         except Exception as e:
#             print(f"❌ 处理器Redis连接失败: {e}")
#             raise SystemExit(1)

#     def _init_storage(self):
#         """初始化存储目录"""
#         os.makedirs(self.storage_root, exist_ok=True)
#         print(f"✅ 数据存储目录: {self.storage_root}")

#     def _process_csv_data(self, csv_str: str, task_id: str) -> bool:
#         """处理CSV数据并按股票存储（仅按股票分区）"""
#         if not csv_str.strip():
#             print(f"⚠️ 任务{task_id}返回空数据，跳过处理")
#             return False

#         try:
#             # 从CSV字符串读取数据（与发布器的CSV处理逻辑兼容）
#             df = pd.read_csv(StringIO(csv_str))
            
#             # 必要字段校验
#             required_cols = ['date', 'stock_code', 'time', 'open', 'close', 'high', 'low', 'volume']
#             missing_cols = [col for col in required_cols if col not in df.columns]
#             if missing_cols:
#                 raise ValueError(f"缺少必要字段: {missing_cols}")

#             # 按股票代码存储（仅股票分区，无日期分区）
#             for stock_code in df['stock_code'].unique():
#                 stock_data = df[df['stock_code'] == stock_code].copy()
#                 stock_dir = os.path.join(self.storage_root, f"stock_code={stock_code}")
#                 os.makedirs(stock_dir, exist_ok=True)
#                 parquet_path = os.path.join(stock_dir, "data.parquet")

#                 # 转换为Arrow表并追加/创建文件
#                 table = pa.Table.from_pandas(stock_data)
#                 if os.path.exists(parquet_path):
#                     existing_table = pq.read_table(parquet_path)
#                     combined_table = pa.concat_tables([existing_table, table])
#                     pq.write_table(combined_table, parquet_path, compression="snappy")
#                 else:
#                     pq.write_table(table, parquet_path, compression="snappy")

#             return True
#         except Exception as e:
#             print(f"❌ 任务{task_id}数据处理失败: {str(e)[:100]}")
#             return False

#     def listen_and_process(self):
#         """监听结果队列并处理数据"""
#         print(f"✅ 开始监听结果队列（{self.idle_timeout}秒无任务退出）")
#         stats = {"success": 0, "failed": 0, "last_active": time.time()}

#         while True:
#             # 检查超时退出
#             if time.time() - stats["last_active"] > self.idle_timeout:
#                 print("\n⏰ 长时间无新任务，退出处理器")
#                 break

#             # 从队列获取结果（与发布器的序列化方式匹配）
#             try:
#                 _, result_bytes = self.redis.blpop(self.result_queue, timeout=30)
#                 if not result_bytes:
#                     continue  # 无数据，继续等待

#                 # 更新活动时间
#                 stats["last_active"] = time.time()

#                 # 反序列化结果（使用pickle，与发布器一致）
#                 result = pickle.loads(result_bytes)
#                 task_id = result.get("task_id", "未知任务")

#                 # 处理结果（与发布器的任务结构对应）
#                 if result.get("status") == "success":
#                     # 处理成功结果
#                     csv_data = result.get("result", "")
#                     if self._process_csv_data(csv_data, task_id):
#                         stats["success"] += 1
#                         # 清理任务元信息（与发布器存储的元信息键对应）
#                         self.redis.hdel(self.task_metadata, task_id)
#                         print(f"✅ 任务{task_id[:8]}...处理成功 | 累计成功: {stats['success']}")
#                     else:
#                         stats["failed"] += 1
#                         print(f"❌ 任务{task_id[:8]}...数据处理失败 | 累计失败: {stats['failed']}")
#                 else:
#                     # 处理远程执行失败的任务
#                     stats["failed"] += 1
#                     error_msg = result.get("error", "无错误信息")
#                     print(f"❌ 任务{task_id[:8]}...远程执行失败: {error_msg} | 累计失败: {stats['failed']}")

#             except Exception as e:
#                 print(f"⚠️ 处理器异常: {str(e)[:80]}，等待10秒重试")
#                 time.sleep(10)

#         # 输出最终统计
#         print("\n" + "="*50)
#         print("结果处理总结")
#         print(f"总处理任务数: {stats['success'] + stats['failed']}")
#         print(f"成功: {stats['success']} | 失败: {stats['failed']}")
#         if stats["success"] + stats["failed"] > 0:
#             print(f"成功率: {stats['success']/(stats['success']+stats['failed'])*100:.1f}%")
#         print("="*50)

# if __name__ == "__main__":
#     try:
#         processor = MinuteDataProcessor(config_path="redis.conf")
#         processor.listen_and_process()
#     except Exception as e:
#         print(f"❌ 处理器执行失败: {e}")
