In [1]:
import pandas as pd
import numpy as np
import json
import re
import time
from typing import List, Dict, Any, Optional, Union, Tuple

class LLMFeaturePipeline:
    """
    LLM驱动的特征工程管道，实现询问建议-获得建议-实施代码-获得新特征的全流程
    """
    
    def __init__(self, llm_api_key: str, model: str = "gpt-4", verbose: bool = True, provider: str = "openai"):
        """
        初始化LLM特征工程管道
        
        参数:
            llm_api_key: LLM API密钥
            model: 使用的LLM模型
            verbose: 是否打印详细信息
            provider: LLM提供商，支持"openai"或"gemini"
        """
        self.api_key = llm_api_key
        self.model = model
        self.verbose = verbose
        self.provider = provider.lower()
        self.setup_api()
        self.feature_suggestions = []
        self.implemented_features = {}
        self.execution_history = []
        
    def setup_api(self):
        """
        设置API客户端
        """
        if self.provider == "openai":
            try:
                import openai
                openai.api_key = self.api_key
                self.client = openai
                if self.verbose:
                    print("✅ OpenAI API客户端设置成功")
            except ImportError:
                raise ImportError("请安装openai库: pip install openai")
        elif self.provider == "gemini":
            try:
                from google import genai
                self.client = genai.Client(api_key=self.api_key)
                if self.verbose:
                    print("✅ Gemini API客户端设置成功")
            except ImportError:
                raise ImportError("请安装google-generativeai库: pip install google-generativeai")
        else:
            raise ValueError(f"不支持的提供商: {self.provider}，目前支持 'openai' 或 'gemini'")
    
    def call_llm(self, prompt: str, system_message: str = None) -> str:
        """
        调用LLM API获取回复
        
        参数:
            prompt: 用户提示
            system_message: 系统提示
            
        返回:
            LLM回复的内容
        """
        if self.provider == "openai":
            messages = []
            if system_message:
                messages.append({"role": "system", "content": system_message})
            
            messages.append({"role": "user", "content": prompt})
            
            try:
                response = self.client.ChatCompletion.create(
                    model=self.model,
                    messages=messages
                )
                return response.choices[0].message.content
            except Exception as e:
                if self.verbose:
                    print(f"❌ API调用失败: {e}")
                time.sleep(2)  # 等待一下再重试
                try:
                    response = self.client.ChatCompletion.create(
                        model=self.model,
                        messages=messages
                    )
                    return response.choices[0].message.content
                except Exception as e2:
                    print(f"❌ 再次API调用失败: {e2}")
                    return "API调用失败，请检查网络连接和API密钥。"
        
        elif self.provider == "gemini":
            try:
                # 构建提示内容
                contents = prompt
                
                if system_message:
                    from google.genai import types
                    response = self.client.models.generate_content(
                        model=self.model,
                        contents=contents,    
                        config=types.GenerateContentConfig(
                            system_instruction=system_message)
                    )
                else:
                    response = self.client.models.generate_content(
                        model=self.model,
                        contents=contents 
                    )
                
                return response.text
                
            except Exception as e:
                if self.verbose:
                    print(f"❌ Gemini API调用失败: {e}")
                time.sleep(2)  # 等待一下再重试
                
                try:
                    # 简化请求再尝试
                    response = self.client.models.generate_content(
                        model=self.model, 
                        contents=prompt
                    )
                    return response.text
                except Exception as e2:
                    print(f"❌ Gemini API再次调用失败: {e2}")
                    return "Gemini API调用失败，请检查网络连接和API密钥。"
        
        return "不支持的提供商"

    def parse_code_from_response(self, response: str) -> str:
        """
        从LLM回复中提取Python代码，支持嵌套代码块
        """
        # 尝试匹配最外层的Python代码块
        code_pattern = r"```python(.*?)```"
        matches = re.findall(code_pattern, response, re.DOTALL)
        
        if matches:
            # 清理提取的代码
            extracted_code = matches[0].strip()
            
            # 检查是否有内部代码块标记，并移除它们
            extracted_code = re.sub(r'```\w*\n', '', extracted_code)
            extracted_code = extracted_code.replace('\n```', '')
            
            return extracted_code
        
        # 如果没有Markdown格式，尝试查找可能的Python代码部分
        if "def " in response and "return" in response:
            code_start = response.find("def ")
            
            # 找到代码块的结束位置
            code_lines = response[code_start:].split('\n')
            end_line = 0
            indent_level = 0
            in_function = False
            
            for i, line in enumerate(code_lines):
                if line.strip().startswith("def ") and line.strip().endswith(":"):
                    in_function = True
                    indent_level = len(line) - len(line.lstrip())
                    continue
                    
                if in_function:
                    if line.strip() and not line.startswith(" " * (indent_level + 4)):
                        # 缩进减少，可能是函数结束
                        if i > 2:  # 至少包含函数定义和一行函数体
                            end_line = i
                            break
            
            if end_line > 0:
                extracted_code = "\n".join(code_lines[:end_line])
                return extracted_code
            else:
                return "\n".join(code_lines)
                
        return ""
    
    def parse_json_from_response(self, response: str) -> Dict:
        """
        从LLM回复中提取JSON内容，改进的健壮版本
        """
        if self.verbose:
            print("\n==== LLM原始响应 ====")
            print(response)
            print("=====================\n")
        
        # 首先尝试直接解析完整响应中的JSON部分
        try:
            # 查找最外层的JSON结构
            json_pattern = r"```json(.*?)```"
            matches = re.findall(json_pattern, response, re.DOTALL)
            
            if matches:
                # 提取JSON字符串并清理
                json_str = matches[0].strip()
                
                # 替换内嵌的代码块
                code_pattern = r"```python(.*?)```"
                json_str = re.sub(code_pattern, lambda m: json.dumps(m.group(1)), json_str)
                
                # 标准化换行符和空格
                json_str = re.sub(r'[\r\n\t]+', ' ', json_str)
                json_str = re.sub(r'\s{2,}', ' ', json_str)
                
                # 尝试解析
                try:
                    return json.loads(json_str)
                except json.JSONDecodeError:
                    # 尝试使用更严格的解析方式
                    return self._extract_json_array_or_object(json_str)
                    
            # 尝试从整个文本中提取JSON数组或对象
            return self._extract_json_array_or_object(response)
        
        except Exception as e:
            if self.verbose:
                print(f"⚠️ JSON解析失败: {e}")
            return self._fallback_parse_suggestions(response)

    def _extract_json_array_or_object(self, text: str) -> Dict:
        """
        从文本中提取JSON数组或对象
        """
        # 查找JSON数组模式：[...]
        array_match = re.search(r'\[\s*\{.*\}\s*\]', text, re.DOTALL)
        if array_match:
            try:
                return json.loads(array_match.group(0))
            except json.JSONDecodeError:
                pass
        
        # 查找JSON对象模式：{...}
        object_match = re.search(r'\{\s*".*"\s*:.*\}', text, re.DOTALL)
        if object_match:
            try:
                return json.loads(object_match.group(0))
            except json.JSONDecodeError:
                pass
        
        # 如果都失败了，返回空结果
        return {}

    def _fallback_parse_suggestions(self, text: str) -> List[Dict]:
        """
        作为最后的手段，从文本中提取建议
        """
        suggestions = []
        
        # 使用正则表达式从文本中提取单个建议
        suggestion_pattern = r'"suggestion_id":\s*"([^"]+)".*?"description":\s*"([^"]+)".*?"rationale":\s*"([^"]+)"'
        matches = re.findall(suggestion_pattern, text, re.DOTALL)
        
        for i, match in enumerate(matches):
            suggestion_id, description, rationale = match
            
            # 为每个匹配项提取代码实现
            implementation_pattern = r'"implementation":\s*"(.*?)"'
            impl_match = re.search(implementation_pattern, text[text.find(suggestion_id):], re.DOTALL)
            implementation = impl_match.group(1) if impl_match else ""
            
            # 提取受影响的列
            affected_cols_pattern = r'"affected_columns":\s*\[(.*?)\]'
            cols_match = re.search(affected_cols_pattern, text[text.find(suggestion_id):], re.DOTALL)
            affected_columns = self._parse_string_array(cols_match.group(1)) if cols_match else []
            
            # 提取新特征
            new_features_pattern = r'"new_features":\s*\[(.*?)\]'
            features_match = re.search(new_features_pattern, text[text.find(suggestion_id):], re.DOTALL)
            new_features = self._parse_string_array(features_match.group(1)) if features_match else []
            
            suggestion = {
                "suggestion_id": suggestion_id,
                "suggestion_type": self._guess_suggestion_type(description),
                "description": description,
                "rationale": rationale,
                "implementation": implementation,
                "affected_columns": affected_columns,
                "new_features": new_features
            }
            
            suggestions.append(suggestion)
        
        if not suggestions:
            # 如果上面的方法都失败了，回退到原来的提取方法
            suggestions = self._extract_suggestions_from_text(text)
        
        return suggestions

    def _parse_string_array(self, array_str: str) -> List[str]:
        """解析字符串数组"""
        values = []
        for item in array_str.split(','):
            item = item.strip().strip('"\'')
            if item:
                values.append(item)
        return values

    def get_dataframe_info(self, df: pd.DataFrame) -> Dict:
        """
        获取数据帧的基本信息
        
        参数:
            df: 输入数据帧
            
        返回:
            数据帧信息字典
        """
        info = {
            "shape": df.shape,
            "columns": df.columns.tolist(),
            "dtypes": {col: str(dtype) for col, dtype in df.dtypes.items()},
            "missing_values": {col: int(df[col].isna().sum()) for col in df.columns},
            "unique_values": {col: int(df[col].nunique()) for col in df.columns}
        }
        
        # 对分类特征收集值分布
        cat_cols = df.select_dtypes(include=['object', 'category']).columns
        if len(cat_cols) > 0:
            info["categorical_distributions"] = {}
            for col in cat_cols:
                if df[col].nunique() < 15:  # 只包括较少唯一值的特征
                    info["categorical_distributions"][col] = df[col].value_counts().to_dict()
        
        # 对数值特征收集基本统计信息
        num_cols = df.select_dtypes(include=['int64', 'float64']).columns
        if len(num_cols) > 0:
            info["numerical_statistics"] = {}
            for col in num_cols:
                info["numerical_statistics"][col] = {
                    "min": float(df[col].min()) if not pd.isna(df[col].min()) else None,
                    "max": float(df[col].max()) if not pd.isna(df[col].max()) else None,
                    "mean": float(df[col].mean()) if not pd.isna(df[col].mean()) else None,
                    "median": float(df[col].median()) if not pd.isna(df[col].median()) else None
                }
                
        return info

    def ask_for_feature_suggestions(self, df: pd.DataFrame, 
                                    task_description: str, 
                                    target_column: Optional[str] = None,
                                    dataset_background: Optional[str] = None,
                                    custom_prompt: Optional[str] = None) -> List[Dict]:
        """
        询问LLM提供特征工程建议
        
        参数:
            df: 输入数据帧
            task_description: 任务描述
            target_column: 目标列名称
            dataset_background: 数据集背景信息，帮助模型理解数据
            custom_prompt: 自定义提示（如果需要）
            
        返回:
            特征工程建议列表
        """
        # 准备数据帧信息
        df_info = self.get_dataframe_info(df)
        data_sample = df.head(3).to_dict() if df.shape[0] > 0 else {}
        
        system_message = """你是一位专业的特征工程专家，擅长发现数据中的模式和创建有价值的特征。
请提供具体、可执行的特征工程建议，每个建议都应包含详细的实现方式。以JSON格式回复。"""
        
        if custom_prompt:
            prompt = custom_prompt
        else:
            background_section = ""
            if dataset_background:
                background_section = f"""
数据集背景：
{dataset_background}
"""

            prompt = f"""
我有一个机器学习项目，需要你帮我进行特征工程。
            
任务描述：{task_description}

{"目标列：" + target_column if target_column else ""}
{background_section}
数据集信息：
- 形状：{df_info['shape']}
- 列：{df_info['columns']}
- 数据类型：{df_info['dtypes']}
- 缺失值：{df_info['missing_values']}
- 唯一值数量：{df_info['unique_values']}

分类特征分布：
{json.dumps(df_info.get('categorical_distributions', {}), indent=2)}

数值特征统计：
{json.dumps(df_info.get('numerical_statistics', {}), indent=2)}

数据样例：
{json.dumps(data_sample, indent=2)}

请提供5-10个有价值的特征工程建议，包括：
1. 特征转换（如二值化、标准化、独热编码等）
2. 特征交互（如特征组合、比率特征等）
3. 基于领域知识的特征（如时间特征、文本特征等）

对每个建议，请提供以下信息，以JSON数组格式返回：
[
  {{
    "suggestion_id": "唯一标识符",
    "suggestion_type": "转换|交互|领域知识|其他",
    "description": "详细的建议描述",
    "rationale": "为什么这个特征可能有价值",
    "implementation": "Python代码实现（可作为一个函数）",
    "affected_columns": ["受影响的列"],
    "new_features": ["新生成的特征名称"]
  }},
  ...
]
"""
        if self.verbose:
            print("🔍 正在询问LLM提供特征工程建议...")
            
        response = self.call_llm(prompt, system_message)
        
        try:
            suggestions = self.parse_json_from_response(response)
            if isinstance(suggestions, list):
                self.feature_suggestions = suggestions
                if self.verbose:
                    print(f"✅ 收到{len(suggestions)}个特征工程建议")
                return suggestions
            else:
                if self.verbose:
                    print("⚠️ LLM返回格式不正确，尝试提取建议")
                return self._extract_suggestions_from_text(response)
        except Exception as e:
            if self.verbose:
                print(f"❌ 解析建议失败: {e}")
            return []
    
    def _extract_suggestions_from_text(self, text: str) -> List[Dict]:
        """
        从文本回复中提取建议
        
        参数:
            text: LLM回复文本
            
        返回:
            提取的建议列表
        """
        if self.verbose:
            print("\n==== 尝试从文本中提取建议 ====")
            print(f"文本长度: {len(text)} 字符")
            print("前500个字符预览:")
            print(text[:500] + "..." if len(text) > 500 else text)
            print("============================\n")
            
        suggestions = []
        
        # 寻找可能的建议部分
        suggestion_blocks = re.split(r'\n\d+[\.\)]\s+', text)
        
        if self.verbose:
            print(f"找到 {len(suggestion_blocks) - 1} 个潜在的建议块")
        
        for i, block in enumerate(suggestion_blocks[1:], 1):  # 跳过第一个可能是介绍的部分
            if self.verbose and i <= 3:  # 只显示前3个块作为示例
                print(f"\n== 建议块 #{i} 预览 ==")
                preview = block[:200] + "..." if len(block) > 200 else block
                print(preview)
                print("===================")
                
            lines = block.strip().split('\n')
            
            if not lines:
                continue
                
            # 提取建议信息
            title = lines[0].strip()
            description = "\n".join(lines[1:])
            
            # 提取代码部分
            code = self.parse_code_from_response(block)
            
            if self.verbose and code:
                print(f"从建议 #{i} 中提取到代码:")
                print(code[:200] + "..." if len(code) > 200 else code)
            
            suggestion = {
                "suggestion_id": f"auto_extracted_{i}",
                "suggestion_type": self._guess_suggestion_type(title),
                "description": title,
                "rationale": description,
                "implementation": code if code else "# 需要手动实现",
                "affected_columns": [],
                "new_features": []
            }
            
            suggestions.append(suggestion)
        
        if self.verbose:
            print(f"📝 从文本中提取了{len(suggestions)}个建议")
            
        self.feature_suggestions = suggestions
        return suggestions
    
    def _guess_suggestion_type(self, text: str) -> str:
        """
        根据文本猜测建议类型
        
        参数:
            text: 建议文本
            
        返回:
            猜测的建议类型
        """
        text = text.lower()
        
        if any(word in text for word in ["交互", "组合", "乘积", "比率", "interaction"]):
            return "交互"
        elif any(word in text for word in ["标准化", "归一化", "编码", "二值化", "transform", "encoding"]):
            return "转换"
        elif any(word in text for word in ["领域", "知识", "domain", "knowledge"]):
            return "领域知识"
        else:
            return "其他"
        
    def implement_feature_suggestion(self, df: pd.DataFrame, suggestion_id: str) -> Tuple[pd.DataFrame, Dict]:
        """
        实现特定的特征工程建议，增强版
        """
        # 查找对应的建议
        suggestion = None
        for s in self.feature_suggestions:
            if s.get("suggestion_id") == suggestion_id:
                suggestion = s
                break
                
        if not suggestion:
            if self.verbose:
                print(f"❌ 找不到ID为{suggestion_id}的建议")
            return df, {"status": "error", "message": f"找不到ID为{suggestion_id}的建议"}
        
        if self.verbose:
            print(f"🔧 正在实现建议: {suggestion.get('description', suggestion_id)}")
        
        # 提取实现代码并清理
        implementation_code = suggestion.get("implementation", "")
        
        # 清理代码中的Markdown标记和特殊字符
        implementation_code = self._clean_implementation_code(implementation_code)
        
        if not implementation_code or implementation_code == "# 需要手动实现":
            # 如果没有实现代码，请求LLM生成
            if self.verbose:
                print("📝 建议中没有实现代码，正在请求LLM生成...")
            
            implementation_code = self._generate_implementation_code(df, suggestion)
        
        # 确保代码是一个函数，如果不是则包装它
        if not implementation_code.strip().startswith("def "):
            function_name = f"feature_{suggestion_id.replace('-', '_').replace('.', '_')}"
            
            # 检查代码是否已经包含函数调用
            if "df = " in implementation_code or "return df" in implementation_code:
                # 已经包含处理逻辑，只需要包装成函数
                implementation_code = f"def {function_name}(df):\n" + "\n".join(
                    f"    {line}" for line in implementation_code.split("\n")
                )
            else:
                # 可能只是一些操作步骤，需要添加DataFrame处理逻辑
                implementation_code = f"""def {function_name}(df):
        df_result = df.copy()
        
        # 实现特征工程逻辑
        {implementation_code.strip()}
        
        return df_result"""
        
            # 确保有返回语句
            if "return" not in implementation_code:
                implementation_code = implementation_code.rstrip() + "\n    return df"
        
        # 处理保留原始特征的逻辑
        keep_original = suggestion.get("keep_original", True)  # 默认保留原始特征
        affected_columns = suggestion.get("affected_columns", [])
        
        # 添加安全检查，确保函数语法正确
        implementation_code = self._add_safety_checks(implementation_code, affected_columns)
        
        # 尝试执行实现代码
        return self._execute_implementation(df, suggestion, implementation_code, keep_original, affected_columns)

    def _clean_implementation_code(self, code: str) -> str:
        """清理实现代码中的Markdown标记和特殊字符"""
        # 移除Markdown代码块标记
        code = re.sub(r'```python\s*', '', code)
        code = re.sub(r'\s*```', '', code)
        
        # 移除可能的引号转义
        code = code.replace('\\"', '"')
        
        # 移除开头和结尾的空白
        return code.strip()

    def _add_safety_checks(self, code: str, affected_columns: List[str]) -> str:
        """添加安全检查确保代码正确执行"""
        # 获取函数名
        func_name = re.search(r'def\s+(\w+)', code).group(1)
        
        # 添加列存在性检查
        column_checks = []
        for col in affected_columns:
            if col:
                column_checks.append(f'    # 检查列 "{col}" 是否存在\n    if "{col}" not in df.columns:\n        print(f"警告: 列 \\"{col}\\" 不存在，跳过该列处理")\n        return df')
        
        # 如果有需要检查的列，插入检查代码
        if column_checks:
            # 查找函数定义的末尾
            func_def_end = code.find(":", code.find("def ")) + 1
            
            # 插入安全检查代码
            safety_code = "\n" + "\n".join(column_checks) + "\n    \n    # 创建副本避免修改原始数据\n    df = df.copy()\n"
            code = code[:func_def_end] + safety_code + code[func_def_end:]
        
        return code

    def _execute_implementation(self, df, suggestion, implementation_code, keep_original, affected_columns, is_retry=False):
        """
        执行特征工程实现代码
        
        参数:
            df: 输入数据帧
            suggestion: 特征建议字典
            implementation_code: 要执行的代码
            keep_original: 是否保留原始特征
            affected_columns: 受影响的列
            is_retry: 是否为重试执行
            
        返回:
            (更新的数据帧, 实现结果信息)
        """
        suggestion_id = suggestion.get("suggestion_id")
        
        try:
            # 创建本地命名空间
            local_namespace = {"pd": pd, "np": np}
            
            # 执行代码
            exec(implementation_code, globals(), local_namespace)
            
            # 获取函数名
            function_name = None
            for name, obj in local_namespace.items():
                if callable(obj) and name not in ["pd", "np"]:
                    function_name = name
                    break
            
            if not function_name:
                raise ValueError("无法找到实现函数")
            
            # 调用函数
            result_df = local_namespace[function_name](df)
            
            # 验证结果
            if not isinstance(result_df, pd.DataFrame):
                raise TypeError("实现函数未返回DataFrame")
            
            # 如果指定不保留原始特征，则移除
            if not keep_original and affected_columns:
                # 确保所有受影响的列都被转换后才移除
                safe_to_remove = all(col in df.columns for col in affected_columns)
                if safe_to_remove:
                    for col in affected_columns:
                        if col in result_df.columns and col not in suggestion.get("new_features", []):
                            if self.verbose:
                                print(f"🗑️ 根据建议移除原始特征: {col}")
                            result_df = result_df.drop(col, axis=1)
            
            # 确定新增的特征
            new_features = list(set(result_df.columns) - set(df.columns))
            
            # 记录实现结果
            implementation_result = {
                "suggestion_id": suggestion_id,
                "status": "success",
                "description": suggestion.get("description", ""),
                "code": implementation_code,
                "new_features": new_features,
                "removed_features": [col for col in df.columns if col not in result_df.columns],
                "keep_original": keep_original,
                "keep_original_reason": suggestion.get("keep_original_reason", ""),
                "error": None
            }
            
            self.implemented_features[suggestion_id] = implementation_result
            self.execution_history.append(implementation_result)
            
            if self.verbose:
                print(f"✅ {'使用修复后的代码' if is_retry else ''}成功实现建议，新增{len(new_features)}个特征: {new_features}")
                if implementation_result["removed_features"]:
                    print(f"🗑️ 移除了{len(implementation_result['removed_features'])}个原始特征: {implementation_result['removed_features']}")
            
            return result_df, implementation_result
            
        except Exception as e:
            error_message = str(e)
            
            if self.verbose:
                print(f"❌ 实现建议时出错: {error_message}")
            
            # 如果不是重试，尝试修复代码并重试
            if not is_retry:
                # 尝试修复代码
                fixed_code = self._fix_implementation_code(implementation_code, error_message, df)
                
                # 如果修复了代码，重新尝试
                if fixed_code != implementation_code:
                    if self.verbose:
                        print("🔄 尝试使用修复的代码重新实现...")
                    
                    # 递归调用，但标记为重试，防止无限循环
                    return self._execute_implementation(df, suggestion, fixed_code, keep_original, affected_columns, is_retry=True)
            
            # 记录失败
            implementation_result = {
                "suggestion_id": suggestion_id,
                "status": "error",
                "description": suggestion.get("description", ""),
                "code": implementation_code,
                "new_features": [],
                "removed_features": [],
                "keep_original": keep_original,
                "keep_original_reason": suggestion.get("keep_original_reason", ""),
                "error": error_message if is_retry else f"初始错误: {error_message}"
            }
            
            self.implemented_features[suggestion_id] = implementation_result
            self.execution_history.append(implementation_result)
            
            return df, implementation_result
    
    def _generate_implementation_code(self, df: pd.DataFrame, suggestion: Dict) -> str:
        """
        为建议生成实现代码
        
        参数:
            df: 输入数据帧
            suggestion: 建议详情
            
        返回:
            实现代码
        """
        # 获取数据帧信息
        df_info = self.get_dataframe_info(df)
        
        system_message = """你是一位特征工程专家，能够编写高质量的Python代码来实现特征工程。
请提供完整可执行的Python函数，针对输入的DataFrame实现所需的特征工程。
代码应该是健壮的，能够处理边缘情况，如缺失值和异常值。"""
        
        prompt = f"""
请为以下特征工程建议编写Python实现代码:

建议描述: {suggestion.get('description', '')}
建议理由: {suggestion.get('rationale', '')}
建议类型: {suggestion.get('suggestion_type', '未知')}
受影响的列: {suggestion.get('affected_columns', [])}
预期新特征: {suggestion.get('new_features', [])}

数据集信息:
- 形状: {df_info['shape']}
- 列: {df_info['columns']}
- 数据类型: {df_info['dtypes']}

请编写一个名为`implement_feature`的Python函数，该函数:
1. 接受一个pandas DataFrame作为输入
2. 实现上述特征工程建议
3. 返回包含新特征的DataFrame

代码应该:
- 处理可能的缺失值
- 包含适当的注释
- 遵循Python最佳实践
- 不使用外部数据源

请仅返回Python代码，不需要解释。
"""
        
        response = self.call_llm(prompt, system_message)
        code = self.parse_code_from_response(response)
        
        if not code:
            # 如果没有提取到代码，使用简单的模板
            code = f"""def implement_feature(df):
    \"\"\"
    实现: {suggestion.get('description', '')}
    
    参数:
        df: 输入数据帧
        
    返回:
        包含新特征的数据帧
    \"\"\"
    df_result = df.copy()
    
    # TODO: 实现特征工程逻辑
    
    return df_result
"""
        
        return code
    
    def _fix_implementation_code(self, code: str, error_message: str, df: pd.DataFrame) -> str:
        """
        修复实现代码中的错误
        
        参数:
            code: 原始代码
            error_message: 错误信息
            df: 输入数据帧
            
        返回:
            修复后的代码
        """
        df_info = self.get_dataframe_info(df)
        
        system_message = """你是一位Python专家，能够修复代码中的错误。
请分析错误信息，并提供修复后的代码。只返回完整的、修复后的代码，不需要解释。"""
        
        prompt = f"""
以下代码在执行时出现错误:

```python
{code}
```

错误信息:
{error_message}

数据集信息:
- 形状: {df_info['shape']}
- 列: {df_info['columns']}
- 数据类型: {df_info['dtypes']}

请修复代码中的错误。只返回完整的、修复后的代码，不要有任何解释。
"""
        
        response = self.call_llm(prompt, system_message)
        fixed_code = self.parse_code_from_response(response)
        
        if not fixed_code:
            # 如果没有提取到代码，返回原始代码
            return code
            
        return fixed_code
    
    def implement_all_suggestions(self, df: pd.DataFrame) -> pd.DataFrame:
        """
        实现所有的特征工程建议
        
        参数:
            df: 输入数据帧
            
        返回:
            包含所有新特征的数据帧
        """
        if not self.feature_suggestions:
            if self.verbose:
                print("⚠️ 没有可用的特征工程建议")
            return df
            
        result_df = df.copy()
        successful_count = 0
        
        for suggestion in self.feature_suggestions:
            suggestion_id = suggestion.get("suggestion_id")
            
            if not suggestion_id:
                continue
                
            if self.verbose:
                print(f"🔍 实现建议 {suggestion_id}: {suggestion.get('description', '')}")
                
            try:
                result_df, impl_result = self.implement_feature_suggestion(result_df, suggestion_id)
                
                if impl_result["status"] == "success":
                    successful_count += 1
            except Exception as e:
                if self.verbose:
                    print(f"❌ 实现建议 {suggestion_id} 时出现未处理的错误: {e}")
        
        if self.verbose:
            print(f"✅ 成功实现 {successful_count}/{len(self.feature_suggestions)} 个建议")
            print(f"🆕 新特征总数: {len(result_df.columns) - len(df.columns)}")
            
        return result_df
    
    def custom_feature_request(self, df: pd.DataFrame, feature_description: str) -> Tuple[pd.DataFrame, Dict]:
        """
        根据自定义描述创建特征
        
        参数:
            df: 输入数据帧
            feature_description: 特征描述
            
        返回:
            (更新的数据帧, 实现结果信息)
        """
        if self.verbose:
            print(f"🔍 正在处理自定义特征请求: {feature_description}")
            
        df_info = self.get_dataframe_info(df)
        
        system_message = """你是一位特征工程专家，能够根据描述创建有价值的特征。
请提供完整可执行的Python函数，实现所需的特征工程。"""

        prompt = f"""
请根据以下描述创建新特征:

特征描述: {feature_description}

数据集信息:
- 形状: {df_info['shape']}
- 列: {df_info['columns']}
- 数据类型: {df_info['dtypes']}

请编写一个名为`create_custom_feature`的Python函数，该函数:
1. 接受一个pandas DataFrame作为输入
2. 根据上述描述创建新特征
3. 返回包含新特征的DataFrame

代码应该:
- 处理可能的缺失值
- 包含适当的注释
- 遵循Python最佳实践

请仅返回Python代码，不需要解释。
"""
        
        response = self.call_llm(prompt, system_message)
        implementation_code = self.parse_code_from_response(response)
        
        # 生成唯一ID
        suggestion_id = f"custom_{int(time.time())}"
        
        # 创建建议对象
        suggestion = {
            "suggestion_id": suggestion_id,
            "suggestion_type": "自定义",
            "description": feature_description,
            "rationale": "用户自定义特征",
            "implementation": implementation_code,
            "affected_columns": [],
            "new_features": []
        }
        
        # 添加到建议列表
        self.feature_suggestions.append(suggestion)
        
        # 实现建议
        return self.implement_feature_suggestion(df, suggestion_id)

In [2]:
pipeline = LLMFeaturePipeline(llm_api_key="AIzaSyAw-q2h2gufVEhHiXWPHzTqZmqbvFnnfrY", provider="gemini", model="gemini-1.5-flash")

✅ Gemini API客户端设置成功


In [4]:
import pandas as pd
train_eng = pd.read_pickle("train_eng.pkl")

# 读取 test_eng.pkl
test_eng = pd.read_pickle("test_eng.pkl")


In [5]:
suggestions = pipeline.ask_for_feature_suggestions(
    df=train_eng, 
    task_description="预测患者存活率", 
    target_column="Survival", 
    dataset_background="这是一个医学数据集，包含患者的各种生理指标和治疗方案。数据来自于某肝病研究项目..."
)
enhanced_df = pipeline.implement_all_suggestions(train_eng)


🔍 正在询问LLM提供特征工程建议...

==== LLM原始响应 ====
```json
[
  {
    "suggestion_id": "FE1",
    "suggestion_type": "转换",
    "description": "将二元分类特征转换为数值型 (0, 1)",
    "rationale": "方便模型处理，避免模型对类别顺序敏感。",
    "implementation": "```python\ndef binary_encode(df, columns):\n    for col in columns:\n        df[col] = df[col].map({'N': 0, 'Y': 1, 'F':0, 'M':1, 'N':0, 'S':0.5, 'Y':1, 'C':0, 'D':1, 'CL':0.5})\n    return df\n```",
    "affected_columns": ["Sex", "Ascites", "Hepatomegaly", "Spiders", "Edema"],
    "new_features": ["Sex_encoded", "Ascites_encoded", "Hepatomegaly_encoded", "Spiders_encoded", "Edema_encoded"]
  },
  {
    "suggestion_id": "FE2",
    "suggestion_type": "转换",
    "description": "对数值特征进行标准化或归一化",
    "rationale": "数值特征范围差异较大，标准化可以提高模型的训练效率和泛化能力。",
    "implementation": "```python\nfrom sklearn.preprocessing import StandardScaler\ndef standardize(df, columns):\n    scaler = StandardScaler()\n    df[columns] = scaler.fit_transform(df[columns])\n    return df\n```",
    "affected