此篇研究只需提供
"main.txt"為使用者的大綱與簡短想法
"data_description.txt"為要分析的table columns所代表的意義
就可產生完整報導
(可使用在產生任何報導上不限於羽球)

# STEP 1

刪減不必要的columns

結果保留['rally', 'time', 'roundscore_A', 'roundscore_B', 'player', 'type', 'lose_reason', 'getpoint_player']

In [56]:
#正式
import dspy
import json
import re
from typing import List, Dict, Any, Optional, ClassVar
import os
from dataclasses import dataclass
import pandas as pd
import google.generativeai as genai

class Gemini(dspy.LM):
    def __init__(self, api_key, model_name="gemini-2.0-flash"):
        self.api_key = api_key
        self.model_name = model_name
        genai.configure(api_key=api_key)
        self._model_instance = genai.GenerativeModel(model_name)
        super().__init__(model=model_name)
     
    def __call__(self, messages=None, **kwargs):
        if messages is None:
            raise ValueError("Missing 'messages' argument")
         
        if isinstance(messages, list):
            prompt_text = "".join([msg.get('content', '') for msg in messages])
        else:
            prompt_text = str(messages)
         
        try:
            response = self._model_instance.generate_content(prompt_text)
            if not response.text:
                raise ValueError("Empty response from Gemini")
            return [{
                'text': response.text,
                'logprobs': None
            }]
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return [{
                'text': "⚠️ Gemini API 回應失敗,可能已達限額或出現錯誤。",
                'logprobs': None
            }]
     
    def basic_request(self, prompt, **kwargs):
        try:
            response = self._model_instance.generate_content(prompt)
            return response.text
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return "⚠️ 無法取得 Gemini 回應"

def setup_gemini_api(api_key):
    lm = Gemini(api_key=api_key)
    dspy.settings.configure(lm=lm)
    return lm

def read_text_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return file.read()

def parse_list_from_response(response_text: str) -> List[str]:
    """
    Parse a Python list from various response formats including markdown code blocks
    """
    # Remove leading/trailing whitespace
    text = response_text.strip()
    
    # Remove markdown code blocks
    text = re.sub(r'```(?:python)?\s*', '', text)
    text = re.sub(r'```\s*', '', text)
    
    # Remove any additional backticks
    text = text.strip('`').strip()
    
    # Try to find a list pattern in the text
    list_patterns = [
        r'\[([^\]]+)\]',  # Match content within square brackets
        r'(\[.*?\])',     # Match the entire list including brackets
    ]
    
    for pattern in list_patterns:
        match = re.search(pattern, text, re.DOTALL)
        if match:
            list_text = match.group(0) if pattern == r'(\[.*?\])' else '[' + match.group(1) + ']'
            break
    else:
        # If no pattern matches, assume the entire cleaned text is the list
        list_text = text
    
    # Clean up the list text
    list_text = list_text.strip()
    
    # Handle both single and double quotes
    try:
        # First try parsing as-is
        return json.loads(list_text)
    except json.JSONDecodeError:

        # Try converting single quotes to double quotes
        list_text_double_quotes = list_text.replace("'", '"')
        return json.loads(list_text_double_quotes)


def extract_news_relevant_fields(description_path: str, main_path: str):
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        print("❌ GOOGLE_API_KEY 環境變數未設定")
        return []
     
    lm = setup_gemini_api(api_key)
    main_content = read_text_file(main_path)
    description = read_text_file(description_path)
    
    prompt = f"""Using the following outline and list of data column descriptions, select only the columns that are useful for the outline.

## outline
{main_content}

## Data Column Descriptions:
{description}

---

Please return only a Python list of column names, like this:
['player_name', 'match_score', 'duration', ...]

Do not include explanations or any other text. Return only the list."""
     
    result = lm.basic_request(prompt)
    
    print(f"🔍 原始回應: {result}")
    
    selected_fields = parse_list_from_response(result)
    
    if selected_fields:
        print("✅ 篩選出的欄位:", selected_fields)
    else:
        print("❌ 未能成功解析欄位列表")
    
    return selected_fields

if __name__ == "__main__":
    fields = extract_news_relevant_fields("data_description.txt", "main.txt")
    print("\n最終欄位清單:")
    print(fields)

🔍 原始回應: ```python
['rally', 'time', 'roundscore_A', 'roundscore_B', 'player', 'type', 'lose_reason', 'getpoint_player']
```
✅ 篩選出的欄位: ['rally', 'time', 'roundscore_A', 'roundscore_B', 'player', 'type', 'lose_reason', 'getpoint_player']

最終欄位清單:
['rally', 'time', 'roundscore_A', 'roundscore_B', 'player', 'type', 'lose_reason', 'getpoint_player']


In [57]:
#fields = ['rally', 'time', 'roundscore_A', 'roundscore_B', 'player', 'type', 'lose_reason', 'getpoint_player']
df = pd.read_csv("set1.csv")
filtered_df = df[fields]
filtered_df.to_csv("filtered_set1.csv")



更改原始的"data_description.txt"到"filtered_data_description.txt"

In [58]:
def read_text_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return file.read()

def extract_descriptions_for_fields(fields: List[str], desc_path: str, output_path: str):
    description_text = read_text_file(desc_path)

    field_desc = {}
    for line in description_text.splitlines():
        for field in fields:
            if line.lower().startswith(field.lower() + ":"):
                field_desc[field] = line.strip()

    try:
        with open(output_path, 'w', encoding='utf-8') as f:
            for field in fields:
                f.write(field_desc.get(field, f"{field}: [Description not found]") + "\n")
        print(f"✅ 已將欄位描述寫入 {output_path}")
    except Exception as e:
        print(f"❌ 寫入失敗: {e}")


extract_descriptions_for_fields(fields, 'data_description.txt', "filtered_data_description.txt")

✅ 已將欄位描述寫入 filtered_data_description.txt


# STEP 2

藉由人為輸入問題與方向提示，給LLM做完整分析問題與方向

In [61]:
import dspy
import json
from typing import List, Dict, Any, Optional, ClassVar
import os
from dataclasses import dataclass
import pandas as pd
import google.generativeai as genai


class Gemini(dspy.LM):
    def __init__(self, api_key, model_name="gemini-2.0-flash"):
        self.api_key = api_key
        self.model_name = model_name
        genai.configure(api_key=api_key)
        self._model_instance = genai.GenerativeModel(model_name)
        super().__init__(model=model_name)

    def __call__(self, messages=None, **kwargs):
        if messages is None:
            raise ValueError("Missing 'messages' argument")

        if isinstance(messages, list):
            prompt_text = "".join([msg.get('content', '') for msg in messages])
        else:
            prompt_text = str(messages)

        try:
            response = self._model_instance.generate_content(prompt_text)
            if not response.text:
                raise ValueError("Empty response from Gemini")
            return [{
                'text': response.text,
                'logprobs': None
            }]
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return [{
                'text': "⚠️ Gemini API 回應失敗,可能已達限額或出現錯誤。",
                'logprobs': None
            }]

    def basic_request(self, prompt, **kwargs):
        try:
            response = self._model_instance.generate_content(prompt)
            return response.text
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return "⚠️ 無法取得 Gemini 回應"

def setup_gemini_api(api_key):
    lm = Gemini(api_key=api_key)
    dspy.settings.configure(lm=lm)
    return lm

def read_text_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return file.read()

def generate_chain_of_thought_response(main_path: str, desc_path: str, output_path: str):
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        print("❌ GOOGLE_API_KEY 環境變數未設定")
        return

    lm = setup_gemini_api(api_key)

    main_content = read_text_file(main_path)
    description = read_text_file(desc_path)

    chain_prompt = f"""
You are a planning assistant.
Analyze the following outline and column descriptions.

## Outline & Ideas:
{main_content}

## Data Column Descriptions:
{description}

---

Step-by-step:
1. Reflect on the structure and meaning of the content.
2. Formulate relevant and meanful questions or planning strategies.
3. Be explicit and detailed, use Chain-of-Thought reasoning.
4. Output all thoughts and questions in English only.
"""

    result = lm.basic_request(chain_prompt)

    try:
        with open(output_path, 'w', encoding='utf-8') as f:
            f.write(result)
        print(f"✅ Response saved to: {output_path}")
    except Exception as e:
        print(f"❌ Failed to write output: {e}")

def main():
    generate_chain_of_thought_response(
        main_path="main.txt",
        desc_path="filtered_data_description.txt",
        output_path="analyze_response.txt"
    )

if __name__ == "__main__":
    main()


✅ Response saved to: analyze_response.txt


# STEP 3

請LLM根據"analyze_response.txt"思考可以使用的operation並將結果存於 "operations_info.json"

In [77]:
import dspy
import json
from typing import List, Dict, Any, Optional, ClassVar
import os
from dataclasses import dataclass
import pandas as pd
import google.generativeai as genai

class Gemini(dspy.LM):
    def __init__(self, api_key, model_name="gemini-2.0-flash"):
        self.api_key = api_key
        self.model_name = model_name
        genai.configure(api_key=api_key)
        self._model_instance = genai.GenerativeModel(model_name)
        super().__init__(model=model_name)

    def __call__(self, messages=None, **kwargs):
        if messages is None:
            raise ValueError("Missing 'messages' argument")

        if isinstance(messages, list):
            prompt_text = "".join([msg.get('content', '') for msg in messages])
        else:
            prompt_text = str(messages)

        try:
            response = self._model_instance.generate_content(prompt_text)
            if not response.text:
                raise ValueError("Empty response from Gemini")
            return [{
                'text': response.text,
                'logprobs': None
            }]
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return [{
                'text': "⚠️ Gemini API 回應失敗,可能已達限額或出現錯誤。",
                'logprobs': None
            }]

    def basic_request(self, prompt, **kwargs):
        try:
            response = self._model_instance.generate_content(prompt)
            return response.text
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return "⚠️ 無法取得 Gemini 回應"

def setup_gemini_api(api_key):
    lm = Gemini(api_key=api_key)
    dspy.settings.configure(lm=lm)
    return lm

def read_text_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return file.read()

def analyze_operations(analyze_path: str, output_json: str) -> List[str]:
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        print("❌ GOOGLE_API_KEY 環境變數未設定")
        return []

    lm = setup_gemini_api(api_key)
    analysis = read_text_file(analyze_path)

    prompt = f"""
You are a news journalist want to analyze data not forecaster.
Based on the following text analysis, identify multiple useful table operations
and describe the direct meaning of each operation.

## Text Analysis:
{analysis}

---

Please output a numbered list in this format:
1. write: If the table is clear or small enough, generates text based on the tables using the LLM.
2. select_row: Description
3. select_column: Description
4. operation_name: Description
5. operation_name: Description
...

IMPORTANT: operation must contain select_row, select_column, and write in the first three operation.

Give important operations and at most 15 operations.
operation_name should be different and each operation can not be similar.
operation can be apply on many columns is better.
Description just give the original definition of the operation name and give some useful functions name in pandas.
Only include operations and their descriptions. Be concise and clear.
"""

    response = lm.basic_request(prompt)

    operations = []
    operations_dict = {}

    try:
        for line in response.strip().split('\n'):
            if line.strip() == "":
                continue
            if "." in line:
                num, rest = line.split(".", 1)
                if ":" in rest:
                    name, desc = rest.strip().split(":", 1)
                    name = name.strip()
                    desc = desc.strip()
                    operations.append(name)
                    operations_dict[num.strip()] = {"operation": name, "description": desc}

        with open(output_json, 'w', encoding='utf-8') as f:
            json.dump(operations_dict, f, indent=2, ensure_ascii=False)

        print(f"✅ 操作清單與描述已儲存至 {output_json}")
        return operations

    except Exception as e:
        print(f"❌ 回應處理失敗: {e}\n原始回應:\n{response}")
        return []

ops = analyze_operations("analyze_response.txt", "operations_info.json")
print("\n✅ 操作名稱陣列:")
print(ops)

✅ 操作清單與描述已儲存至 operations_info.json

✅ 操作名稱陣列:
['write', 'select_row', 'select_column', 'group_by', 'aggregate', 'value_counts', 'crosstab', 'pivot_table', 'sort', 'calculate', 'merge', 'rolling_window', 'normalize', 'correlation', 'one_hot_encoding']


# STEP 4

使LLM自動分析table選出合適的operation放入操作池(operations)

In [31]:
# badminton_analysis.py
#正式
import pandas as pd
import numpy as np
import dspy
import os
import re
import json
from openai import OpenAI

# Define a simple signature for the ChainOfThought
class OperationSignature(dspy.Signature):
    """Identify suitable operations for analyzing badminton match data."""
    data_description = dspy.InputField(desc="Overview and sample of the dataset")
    column_descriptions = dspy.InputField(desc="Descriptions of each column in the dataset")
    rules = dspy.InputField(desc="Rules for selecting operations")
    operations_list = dspy.OutputField(desc="A list of suitable operations number")

def read_badminton_data(file_path):
    try:
        return pd.read_csv(file_path, encoding='utf-8')
    except UnicodeDecodeError:
        return pd.read_csv(file_path, encoding='latin1')

def read_text_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return file.read()

def read_json_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return json.load(file)
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return json.load(file)

def parse_column_descriptions(description_text):
    descriptions = {}
    pattern = r'''
        ^                # Line start
        (\w+)            # Column name
        :\s+             # Colon and space
        (.+?)            # Description text
        (?=\n\w+:\s+|\Z) # Lookahead for next column or end of file
    '''
    matches = re.findall(pattern, description_text, flags=re.M | re.X)
    for col_name, desc in matches:
        clean_desc = ' '.join(desc.split()).strip()
        descriptions[col_name] = clean_desc
    return descriptions

class BadmintonOperationSelector(dspy.Module):
    def __init__(self):
        super().__init__()
        self.chain_of_thought = dspy.ChainOfThought(OperationSignature)

    def forward(self, data_description, column_descriptions, rules):
        result = self.chain_of_thought(
            data_description=data_description,
            column_descriptions=str(column_descriptions),
            rules=str(rules)
        )
        return self.extract_operations_from_result(result.operations_list)

    def extract_operations_from_result(self, operations_text):
        operations = []
        lines = operations_text.split('\n')
        for line in lines:
            line = line.strip()
            if not line:
                continue
            line = re.sub(r'^[\d\.\)\-\*]+\s*', '', line)
            if line and len(line) < 100:
                operations.append(line)
        return operations

def analyze_badminton_match(data_path, column_desc_path, rules_path, api_key):
    print("Reading badminton match data...")
    try:
        match_data = read_badminton_data(data_path)
        columns_desc_content = read_text_file(column_desc_path)
        rules = read_json_file(rules_path)
    except Exception as e:
        print(f"Error reading files: {e}")
        return []

    column_descriptions = parse_column_descriptions(columns_desc_content)
    setup_gemini_api(api_key)

    data_sample = match_data.to_string()
    data_description = f"""
    one match data:
    {data_sample}

    Data shape: {match_data.shape[0]} rows, {match_data.shape[1]} columns
    Columns: {', '.join(match_data.columns)}
    """

    selector = BadmintonOperationSelector()
    operations = selector.forward(data_description, column_descriptions, rules)

    print(f"Identified {len(operations)} suitable operations:")
    for i, op in enumerate(operations, 1):
        print(f"{i}. {op}")

    return operations

def setup_gemini_api(api_key):
    os.environ['GOOGLE_API_KEY'] = api_key

if __name__ == "__main__":
    data_path = "set1.csv"
    column_desc_path = "data_description.txt"
    rules_path = "operation_description.json"
    api_key = os.getenv("GOOGLE_API_KEY", "AIzaSyDI6yAgr689NOqj2G34cgDr-aa5tv2aO8g")

    if not api_key:
        print("Error: Google API key not found. Please set the GOOGLE_API_KEY environment variable.")
    else:
        operations = analyze_badminton_match(data_path, column_desc_path, rules_path, api_key)
        print("\nFinal operations array:", operations)


Reading badminton match data...
Identified 1 suitable operations:
1. [1, 2, 3, 5, 7, 9, 10, 11, 12, 18, 19]

Final operations array: ['[1, 2, 3, 5, 7, 9, 10, 11, 12, 18, 19]']


將所挑選出來的操作寫入"operations.json"

In [32]:
import json

# 從 JSON 檔案讀取 "operations" 分支
with open("operation_description.json", "r", encoding="utf-8") as f:
    all_data = json.load(f)
    original_operations = all_data["operations"]

# 你想要挑選的 operation 編號（根據實際需求修改這個 list）
selected_numbers = [1, 2, 3, 5, 7, 9, 10, 11, 12, 18, 19]

# 根據 selected_numbers 選出對應操作，並從 1 開始重新編號
filtered_operations = []
for new_number, original_number in enumerate(selected_numbers, start=1):
    for op in original_operations:
        if op["number"] == original_number:
            filtered_operations.append({
                "number": new_number,
                "name": op["name"],
                "description": op["description"]
            })
            break

# 新的 JSON 結構
output_json = {
    "description": "Selected operations for badminton data analysis.",
    "requirements": [
        "The output must be based on the input data ; do not hallucinate.",
        "Give me the list of numbers."
    ],
    "operations": filtered_operations
}

# 寫入 JSON 檔案
with open("operations.json", "w", encoding="utf-8") as f:
    json.dump(output_json, f, ensure_ascii=False, indent=2)

print("operations.json has been created.")


operations.json has been created.


# STEP 5

篩選出最合適的1/2 operations

In [None]:
import pandas as pd
import numpy as np
import dspy
import re
import json
import google.generativeai as genai
import os

# Define Gemini LLM class (same as before)
class Gemini(dspy.LM):
    def __init__(self, api_key, model_name="gemini-2.0-flash"):
        self.api_key = api_key
        self.model_name = model_name
        genai.configure(api_key=api_key)
        self._model_instance = genai.GenerativeModel(model_name)
        super().__init__(model=model_name)
    
    def __call__(self, messages=None, **kwargs):
        if messages is None:
            raise ValueError("Missing 'messages' argument")
        
        if isinstance(messages, list):
            prompt_text = "".join([msg.get('content', '') for msg in messages])
        else:
            prompt_text = str(messages)
        
        try:
            response = self._model_instance.generate_content(prompt_text)
            if not response.text:
                raise ValueError("Empty response from Gemini")
            return [{
                'text': response.text,
                'logprobs': None
            }]
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return [{
                'text': "⚠️ Gemini API 回應失敗,可能已達限額或出現錯誤。",
                'logprobs': None
            }]
    
    def basic_request(self, prompt, **kwargs):
        try:
            response = self._model_instance.generate_content(prompt)
            return response.text
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return "⚠️ 無法取得 Gemini 回應"

# Set up Gemini API
def setup_gemini_api(api_key):
    lm = Gemini(api_key=api_key)
    dspy.configure(lm=lm)
    return lm

def load_operations_from_json(json_file_path):
    """
    Load operations from JSON file
    Returns a list of operation dictionaries and a formatted string list
    """
    try:
        with open(json_file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        
        operations_data = data.get('operations', [])
        
        # Create formatted operation strings for LLM processing
        operation_strings = []
        operation_details = []
        
        for op in operations_data:
            number = op.get('number', '')
            name = op.get('name', '')
            description = op.get('description', '')
            
            # Format as: "number. name: description"
            if number and name and description:
                formatted_op = f"{number}. {name}: {description}"
                operation_strings.append(formatted_op)
                operation_details.append({
                    'number': number,
                    'name': name,
                    'description': description,
                    'formatted': formatted_op
                })
        
        print(f"從 {json_file_path} 成功載入 {len(operation_strings)} 個操作")
        return operation_details, operation_strings
        
    except FileNotFoundError:
        print(f"錯誤: 找不到文件 {json_file_path}")
        return [], []
    except json.JSONDecodeError:
        print(f"錯誤: {json_file_path} 不是有效的 JSON 文件")
        return [], []
    except Exception as e:
        print(f"載入操作時發生錯誤: {e}")
        return [], []

class OperationFilter:
    def __init__(self, gemini_api_key):
        self.gemini_lm = Gemini(api_key=gemini_api_key)
    
    def forward(self, operations_list, operation_details, data_info, removal_percentage=0.25):
        """
        Use Gemini to filter out inappropriate operations
        """
        operations_count = len(operations_list)
        operations_to_remove = int(operations_count * removal_percentage)
        operations_to_keep = operations_count - operations_to_remove
        
        # Create numbered list of operations for easier reference
        numbered_operations = "\n".join([f"{i+1}. {op}" for i, op in enumerate(operations_list)])
        
        prompt = f"""
        我有 {operations_count} 個用於分析羽球比賽資料的操作：

        操作清單：
        {numbered_operations}

        資料集資訊：
        {data_info}

        我需要移除 {operations_to_remove} 個最不適合或最不相關的操作（約 {removal_percentage*100:.0f}%）。

        請基於以下標準分析每個操作並識別應該移除哪些操作：
        1. 與實際可用資料欄位的相關性
        2. 在給定資料集結構下的可行性  
        3. 對羽球比賽分析的實用價值
        4. 避免重複或非常相似的操作

        請保留 {operations_to_keep} 個最適合的操作並清楚列出它們的編號。
        
        請逐步思考為什麼某些操作應該被移除，然後提供要保留操作的編號清單。

        請用以下格式回答：
        分析思路：
        [你的分析]

        保留的操作編號：
        [編號1, 編號2, 編號3, ...]
        """
        
        response = self.gemini_lm.basic_request(prompt)
        print(f"response: {response}")
        # Extract the kept operation numbers from the result
        kept_operation_numbers = self.extract_kept_operation_numbers(response, operation_details)
        
        return kept_operation_numbers, response
    
    def extract_kept_operation_numbers(self, filtered_text, operation_details):
        """
        Extract the operation numbers that should be kept from the LLM response
        """
        kept_numbers = []
        
        print(f"Debug - 正在分析回應文本...")
        
        # Look for the section with kept operation numbers
        lines = filtered_text.split('\n')
        
        # Find the start of the operations list
        start_extracting = False
        for line in lines:
            line = line.strip()
            print(f"Debug - 檢查行: {line}")
            
            # Look for section headers that indicate the start of the kept operations list
            if any(keyword in line.lower() for keyword in ['保留的操作編號', '保留操作編號', '編號', 'numbers']):
                start_extracting = True
                print(f"Debug - 找到標題行，開始提取")
                # 檢查標題行本身是否包含數字
                bracket_match = re.search(r'\[(.*?)\]', line)
                if bracket_match:
                    numbers_text = bracket_match.group(1)
                    print(f"Debug - 在標題行找到方括號內容: {numbers_text}")
                    # Split by comma and extract numbers
                    for item in numbers_text.split(','):
                        number = re.search(r'(\d+)', item.strip())
                        if number:
                            kept_numbers.append(number.group(1))
                            print(f"Debug - 提取到編號: {number.group(1)}")
                    if kept_numbers:  # 如果在標題行找到數字，就不需要繼續了
                        break
                continue
            
            if start_extracting and line:
                # Try to extract numbers from various formats
                # Format 1: [編號1, 編號2, ...]
                bracket_match = re.search(r'\[(.*?)\]', line)
                if bracket_match:
                    numbers_text = bracket_match.group(1)
                    print(f"Debug - 找到方括號內容: {numbers_text}")
                    # Split by comma and extract numbers
                    for item in numbers_text.split(','):
                        number = re.search(r'(\d+)', item.strip())
                        if number:
                            kept_numbers.append(number.group(1))
                            print(f"Debug - 提取到編號: {number.group(1)}")
                    break
                
                # Format 2: Numbered list or comma-separated numbers
                numbers = re.findall(r'\b(\d+)\b', line)
                if numbers:
                    kept_numbers.extend(numbers)
                    print(f"Debug - 從行中提取到編號: {numbers}")
                    break
        
        print(f"Debug - 原始提取的編號: {kept_numbers}")
        
        # Remove duplicates and validate against available operations
        valid_numbers = []
        available_numbers = [str(detail['number']) for detail in operation_details]  # 確保都是字符串
        
        print(f"Debug - 可用的編號: {available_numbers}")
        
        for num in kept_numbers:
            num_str = str(num)  # 確保是字符串
            if num_str in available_numbers and num_str not in valid_numbers:
                valid_numbers.append(num_str)
        
        print(f"Debug - 驗證後的有效編號: {valid_numbers}")
        return valid_numbers

def filter_operations_direct_gemini(api_key, operations_list, operation_details, data_sample, data_info, removal_percentage=0.25):
    """
    Use Gemini API directly to filter operations and return operation numbers
    """
    gemini_lm = Gemini(api_key=api_key)
    
    operations_count = len(operations_list)
    operations_to_remove = int(operations_count * removal_percentage)
    operations_to_keep = operations_count - operations_to_remove
    
    # Create numbered list of operations for easier reference
    numbered_operations = "\n".join([f"{i+1}. {op}" for i, op in enumerate(operations_list)])
    
    prompt = f"""
    我有一個羽球比賽的資料集和 {operations_count} 個分析操作。

    資料樣本:
    {data_sample}

    資料集資訊:
    {data_info}

    操作清單:
    {numbered_operations}

    請幫我分析並移除 {operations_to_remove} 個最不合適的操作（約 {removal_percentage*100:.0f}%），保留 {operations_to_keep} 個最適合的操作。

    請考慮以下標準來決定移除哪些操作：
    1. 與實際資料欄位的相關性
    2. 在給定資料集結構下的可行性
    3. 對羽球比賽分析的實用價值
    4. 避免重複或過於相似的操作

    請先說明你的分析思路，然後**只提供要保留操作的編號**（從操作描述開頭提取的編號）。

    請用以下格式回答：

    分析思路：
    [你的分析]

    保留的操作編號：
    [編號1, 編號2, 編號3, ...]

    確保只列出要保留的 {operations_to_keep} 個操作的編號。
    """
    
    response = gemini_lm.basic_request(prompt)
    
    # Extract kept operation numbers from the response
    kept_operation_numbers = extract_operation_numbers_from_response(response, operation_details)
    
    return kept_operation_numbers, response

def extract_operation_numbers_from_response(response, operation_details):
    """
    Extract the operation numbers to keep from Gemini's response
    """
    kept_numbers = []
    
    #print(f"Debug - 正在分析回應文本...")
    
    # Look for the section with kept operation numbers
    lines = response.split('\n')
    
    # Find the start of the operations list
    start_extracting = False
    for line in lines:
        line = line.strip()
       # print(f"Debug - 檢查行: {line}")
        
        # Look for section headers that indicate the start of the kept operations list
        if any(keyword in line.lower() for keyword in ['保留的操作編號', '保留操作編號', '編號', 'numbers']):
            start_extracting = True
            #print(f"Debug - 找到標題行，開始提取")
            # 檢查標題行本身是否包含數字
            bracket_match = re.search(r'\[(.*?)\]', line)
            if bracket_match:
                numbers_text = bracket_match.group(1)
                #print(f"Debug - 在標題行找到方括號內容: {numbers_text}")
                # Split by comma and extract numbers
                for item in numbers_text.split(','):
                    number = re.search(r'(\d+)', item.strip())
                    if number:
                        kept_numbers.append(number.group(1))
                        #print(f"Debug - 提取到編號: {number.group(1)}")
                if kept_numbers:  # 如果在標題行找到數字，就不需要繼續了
                    break
            continue
        
        if start_extracting and line:
            # Try to extract numbers from various formats
            # Format 1: [編號1, 編號2, ...]
            bracket_match = re.search(r'\[(.*?)\]', line)
            if bracket_match:
                numbers_text = bracket_match.group(1)
                #print(f"Debug - 找到方括號內容: {numbers_text}")
                # Split by comma and extract numbers
                for item in numbers_text.split(','):
                    number = re.search(r'(\d+)', item.strip())
                    if number:
                        kept_numbers.append(number.group(1))
                       # print(f"Debug - 提取到編號: {number.group(1)}")
                break
            
            # Format 2: Numbered list or comma-separated numbers
            numbers = re.findall(r'\b(\d+)\b', line)
            if numbers:
                kept_numbers.extend(numbers)
                #print(f"Debug - 從行中提取到編號: {numbers}")
                break
    
    #print(f"Debug - 原始提取的編號: {kept_numbers}")
    
    # Remove duplicates and validate against available operations
    valid_numbers = []
    available_numbers = [str(detail['number']) for detail in operation_details]  # 確保都是字符串
    
    #print(f"Debug - 可用的編號: {available_numbers}")
    
    for num in kept_numbers:
        num_str = str(num)  # 確保是字符串
        if num_str in available_numbers and num_str not in valid_numbers:
            valid_numbers.append(num_str)
    
    #print(f"Debug - 驗證後的有效編號: {valid_numbers}")
    return valid_numbers

def get_data_summary(dataframe):
    """
    Generate a comprehensive summary of the dataset
    """
    summary = f"""
    資料集概要:
    - 總行数: {dataframe.shape[0]}
    - 總列数: {dataframe.shape[1]}
    - 欄位名稱: {', '.join(dataframe.columns)}
    
    各欄位資訊:
    """
    
    for col in dataframe.columns:
        col_info = f"  - {col}: "
        if dataframe[col].dtype in ['object', 'string']:
            unique_values = dataframe[col].unique()[:10]  # Show first 10 unique values
            col_info += f"類別型資料, 獨特值範例: {', '.join(map(str, unique_values))}"
        else:
            col_info += f"數值型資料, 範圍: {dataframe[col].min()} - {dataframe[col].max()}"
        
        summary += col_info + "\n"
    
    return summary

def filter_badminton_operations(operations_list, operation_details, dataframe, api_key, removal_percentage=0.25):
    """
    Main function to filter operations using Gemini LLM and return operation numbers
    """
    print(f"原始操作數量: {len(operations_list)}")
    print("原始操作清單:")
    for i, op in enumerate(operations_list, 1):
        print(f"  {i}. {op}")
    
    # Get data summary
    data_summary = get_data_summary(dataframe)
    data_sample = dataframe.to_string()  # Use only first 5 rows for sample
    
    print(f"\n使用 Gemini LLM 過濾操作 (移除 {removal_percentage*100:.0f}%)...")
    
    # Method 1: Use custom filter class with Gemini
    try:
        print("方法1: 使用自定義過濾器 + Gemini...")
        filter_module = OperationFilter(api_key)
        dspy_filtered_numbers, dspy_response = filter_module.forward(
            operations_list, 
            operation_details,
            data_summary, 
            removal_percentage
        )
        print(f"自定義過濾器方法保留了 {len(dspy_filtered_numbers)} 個操作編號")
        
        print("\n自定義過濾器回應:")
        print("="*30)
        print(dspy_response[:500] + "..." if len(dspy_response) > 500 else dspy_response)
        print("="*30)
        
    except Exception as e:
        print(f"自定義過濾器方法失敗: {e}")
        dspy_filtered_numbers = []
    
    # Method 2: Direct Gemini API call
    try:
        print("方法2: 直接使用 Gemini API...")
        direct_filtered_numbers, gemini_response = filter_operations_direct_gemini(
            api_key, 
            operations_list, 
            operation_details,
            data_sample, 
            data_summary, 
            removal_percentage
        )
        print(f"直接 API 方法保留了 {len(direct_filtered_numbers)} 個操作編號")
        
        print("\nGemini 回應:")
        print("="*50)
        print(gemini_response)
        print("="*50)
        
    except Exception as e:
        print(f"直接 API 方法失敗: {e}")
        direct_filtered_numbers = []
    
    # Combine results (prefer direct method if both work, fallback to custom filter)
    if direct_filtered_numbers:
        final_operation_numbers = direct_filtered_numbers
        print(f"\n使用直接 API 方法的結果")
    elif dspy_filtered_numbers:
        final_operation_numbers = dspy_filtered_numbers
        print(f"\n使用自定義過濾器方法的結果")
    else:
        # Fallback: keep random subset of operation numbers
        target_count = int(len(operations_list) * (1 - removal_percentage))
        final_operation_numbers = [detail['number'] for detail in operation_details[:target_count]]
        print(f"\n兩種方法都失敗，使用前 {target_count} 個操作編號作為備案")
    
    print(f"\n最終保留的操作編號 ({len(final_operation_numbers)} 個):")
    for i, number in enumerate(final_operation_numbers, 1):
        # Find the operation details for display
        for detail in operation_details:
            if detail['number'] == number:
                print(f"  {i}. 編號 {number}: {detail['name']}")
                break
    
    return final_operation_numbers

# Example usage
if __name__ == "__main__":
    # Load operations from JSON file
    json_file_path = "operations.json"
    operation_details, operation_strings = load_operations_from_json(json_file_path)
    
    if not operation_strings:
        print("無法載入操作，程式結束")
        exit(1)
    
    print(f"\n載入的操作詳情:")
    for i, detail in enumerate(operation_details[:5], 1):  # Show first 5 as example
        print(f"  {i}. 編號: {detail['number']}, 名稱: {detail['name']}")
        print(f"      描述: {detail['description']}")
    
    if len(operation_details) > 5:
        print(f"  ... 還有 {len(operation_details) - 5} 個操作")
    
    # Load badminton data
    try:
        example_df = pd.read_csv('set1.csv')
        print(f"\n成功載入資料集: {example_df.shape[0]} 行, {example_df.shape[1]} 列")
    except FileNotFoundError:
        print("錯誤: 找不到 set1.csv 文件")
        exit(1)
    except Exception as e:
        print(f"載入資料集時發生錯誤: {e}")
        exit(1)
    
    # Your Gemini API key
    api_key = os.getenv("GOOGLE_API_KEY")
    
    # Filter operations and get operation numbers
    filtered_operation_numbers = filter_badminton_operations(
        operation_strings,  # Use the formatted strings for LLM processing
        operation_details,  # Pass the operation details for number extraction
        example_df, 
        api_key, 
        removal_percentage=0.2
    )
    
    print(f"\n操作過濾完成！從 {len(operation_strings)} 個操作減少到 {len(filtered_operation_numbers)} 個操作。")
    print(f"保留的操作編號清單: {filtered_operation_numbers}")

    # 從 JSON 檔案讀取 "operations" 分支
    with open("operations.json", "r", encoding="utf-8") as f:
        all_data = json.load(f)
        original_operations = all_data["operations"]

    # 你想要挑選的 operation 編號（根據實際需求修改這個 list）
    selected_numbers = [int(num) for num in filtered_operation_numbers]
    print(f"selected_numbers: {selected_numbers}")
    print(f"original_operations: {original_operations}")
    # 根據 selected_numbers 選出對應操作，並從 1 開始重新編號
    filtered_operations = []
    for new_number, original_number in enumerate(selected_numbers, start=1):
        for op in original_operations:
            if op["number"] == original_number:
                filtered_operations.append({
                    "number": new_number,
                    "name": op["name"],
                    "description": op["description"]
                })
                break
    #print(f"filtered_operations: {filtered_operations}")
    # 新的 JSON 結構
    output_json = {
        "description": "Selected operations for badminton data analysis.",
        "requirements": [
            "The output must be based on the input data ; do not hallucinate.",
            "Give me the list of numbers."
        ],
        "operations": filtered_operations
    }

    # 寫入 JSON 檔案
    with open("filtered_operations.json", "w", encoding="utf-8") as f:
        json.dump(output_json, f, ensure_ascii=False, indent=2)

    print("filtered_operations.json has been created.")


從 operations.json 成功載入 11 個操作

載入的操作詳情:
  1. 編號: 1, 名稱: View data
      描述: Inspect dataset preview and structure.
  2. 編號: 2, 名稱: Add column
      描述: Add new features, e.g., rally length or is_win.
  3. 編號: 3, 名稱: Delete column
      描述: Remove irrelevant or noisy fields.
  4. 編號: 4, 名稱: Fill missing values
      描述: Replace NaN with default or computed values.
  5. 編號: 5, 名稱: Column statistics
      描述: Compute stats like average rally time or shot count.
  ... 還有 6 個操作

成功載入資料集: 315 行, 30 列
原始操作數量: 11
原始操作清單:
  1. 1. View data: Inspect dataset preview and structure.
  2. 2. Add column: Add new features, e.g., rally length or is_win.
  3. 3. Delete column: Remove irrelevant or noisy fields.
  4. 4. Fill missing values: Replace NaN with default or computed values.
  5. 5. Column statistics: Compute stats like average rally time or shot count.
  6. 6. Sort data: Sort by duration, player, or score for ranking or filtering.
  7. 7. Filter data: Use logical conditions to narrow down the 

# STEP 6

根據真實table只保留重要70%操作，保留'write' 'select_col' 'select_row'三個重要操作，到'selected_operations.json'

操作提取已完成!!

In [None]:
# 正式版 - 使用 Gemini 過濾羽球比賽分析操作
import pandas as pd
import numpy as np
import dspy
import re
import json
import google.generativeai as genai
import os

class Gemini(dspy.LM):
    def __init__(self, api_key, model_name="gemini-2.0-flash"):
        self.api_key = api_key
        self.model_name = model_name
        genai.configure(api_key=api_key)
        self._model_instance = genai.GenerativeModel(model_name)
        super().__init__(model=model_name)

    def __call__(self, messages=None, **kwargs):
        if messages is None:
            raise ValueError("Missing 'messages' argument")
        if isinstance(messages, list):
            prompt_text = "".join([msg.get('content', '') for msg in messages])
        else:
            prompt_text = str(messages)
        try:
            response = self._model_instance.generate_content(prompt_text)
            if not response.text:
                raise ValueError("Empty response from Gemini")
            return [{ 'text': response.text, 'logprobs': None }]
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return [{ 'text': "⚠️ Gemini API 回應失敗,可能已達限額或出現錯誤。", 'logprobs': None }]

    def basic_request(self, prompt, **kwargs):
        try:
            response = self._model_instance.generate_content(prompt)
            return response.text
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return "⚠️ 無法取得 Gemini 回應"

def setup_gemini_api(api_key):
    lm = Gemini(api_key=api_key)
    dspy.settings.configure(lm=lm)
    return lm

def load_operations_from_json(json_file_path):
    try:
        with open(json_file_path, 'r', encoding='utf-8') as f:
            data = json.load(f)
        operations_data = data if isinstance(data, dict) else {}
        operation_details = []
        operation_strings = []

        for key, op in operations_data.items():
            name = op.get('operation', '')
            desc = op.get('description', '')
            if key and name and desc:
                formatted_op = f"{key}. {name}: {desc}"
                operation_strings.append(formatted_op)
                operation_details.append({
                    'number': key,
                    'name': name,
                    'description': desc,
                    'formatted': formatted_op
                })

        print(f"從 {json_file_path} 成功載入 {len(operation_strings)} 個操作")
        return operation_details, operation_strings
    except Exception as e:
        print(f"❌ 載入 operations_info.json 時發生錯誤: {e}")
        return [], []

def get_data_summary(dataframe):
    summary = f"資料集概要:\n- 總行數: {dataframe.shape[0]}\n- 總列數: {dataframe.shape[1]}\n- 欄位名稱: {', '.join(dataframe.columns)}\n\n各欄位資訊:\n"
    for col in dataframe.columns:
        summary += f"  - {col}: "
        if dataframe[col].dtype in ['object', 'string']:
            summary += f"類別型資料, 獨特值範例: {', '.join(map(str, dataframe[col].unique()[:10]))}\n"
        else:
            summary += f"數值型資料, 範圍: {dataframe[col].min()} - {dataframe[col].max()}\n"
    return summary

def read_text_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return file.read()

def extract_operation_numbers_from_response(response):
    # 使用正则表达式匹配数组
    pattern = r'```\s*\[([\d,\s]+)\]\s*```'
    match = re.search(pattern, response)

    if match:
        # 提取匹配的数组字符串并转换为列表
        array_str = match.group(1)
        operation_list = [int(num) for num in array_str.replace(' ', '').split(',')]
        print(operation_list)
        return operation_list
    else:
        print("未找到排序数组")
    
def filter_badminton_operations(operation_details, operation_strings, df, api_key, outline_path='outline.txt'):
    gemini = Gemini(api_key=api_key)
    data_summary = get_data_summary(df)
    data_sample = df.to_string()
    outline = read_text_file(outline_path)
    print(f"operation_length: {len(operation_strings)}")
    prompt = f"""
    我有一個撰寫新聞的大鋼與比賽的資料集和 {len(operation_strings)} 個分析操作，請依據操作重要性排序(由高到低)。

    大綱:
    {outline}

    資料樣本:
    {data_sample}

    資料集資訊:
    {data_summary}

    操作清單:
    {'\n'.join(operation_strings)}

    請先根據chain_of_thought分析，將操作編號根據重要性排序，每個編號僅在陣列中出現一次，陣列長度應為{len(operation_strings)}:
    [1, 2, 3, ...]
    """
    response = gemini.basic_request(prompt)
    #print(response)
    return extract_operation_numbers_from_response(response), response

def main():
    json_file_path = "operations_info.json"
    operation_details, operation_strings = load_operations_from_json(json_file_path)
    if not operation_details:
        return
    try:
        df = pd.read_csv("filtered_set1.csv")
    except Exception as e:
        print(f"❌ 載入資料錯誤: {e}")
        return
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        print("❌ GOOGLE_API_KEY 未設定")
        return
    kept_numbers, response = filter_badminton_operations(operation_details, operation_strings, df, api_key, outline_path = 'main.txt')
    print(f"response: {response}")
    #kept_numbers = list(set(kept_numbers) | {1, 2, 3})
    print("排序操作重要性編號:", kept_numbers)
    kept_numbers = list(set(kept_numbers[:int(0.7*len(kept_numbers))]) | {1,2,3})
    new_operations = []
    print(f"operation_details: {operation_details}")
    for new_id, num in enumerate(kept_numbers, 1):
        for detail in operation_details:
            if int(detail['number']) == int(num):
                new_operations.append({
                    'number': new_id,
                    'name': detail['name'],
                    'description': detail['description']
                })
                break
    output_json = new_operations
    
    with open("selected_operations.json", "w", encoding="utf-8") as f:
        json.dump(output_json, f, indent=2, ensure_ascii=False)
    print("✅ selected_operations.json has been created.")

if __name__ == "__main__":
    main()

從 operations_info.json 成功載入 15 個操作
operation_length: 15
[1, 2, 4, 5, 3, 6, 7, 9, 10, 8, 13, 12, 14, 15, 11]
response: 好的，我將根據您的要求，使用鏈式思考的方式，分析並排序您提供的 15 個分析操作，並輸出一個由操作編號組成的陣列，按照重要性由高到低排列。

**鏈式思考分析：**

首先，我需要理解作為一個資深羽毛球新聞記者，讀者最想看到什麼樣的資訊，以及哪些分析操作能提供這些資訊。

1.  **比賽結果和基本資訊：** 讀者首先關心的是誰贏了，以及比賽的總體情況。

2.  **關鍵球員表現和戰術：** 讀者會想知道關鍵球員在比賽中的表現如何，使用了哪些戰術，以及這些戰術是否有效。

3.  **失誤分析：** 分析失誤的原因可以幫助讀者了解比賽的轉折點和球員的弱點。

4.  **進階分析：** 結合時間、回合得分等因素，分析比賽的節奏、球員的體能變化等。

基於以上思考，我將各個操作的重要性排序如下：

*   **極高重要性：**

    *   **1. write:** 這個操作能將分析結果轉化為新聞報導，是最終呈現給讀者的形式。
    *   **2. select_row:** 選擇特定條件的行，例如特定球員的數據，是深入分析的基礎。
    *   **4. group_by:** 將數據按照不同的類型分組，例如按照球員、回合等分組，是進行統計分析的前提。
    *   **5. aggregate:** 統計不同組別的數據，例如統計每個球員的得分、失誤等，提供關鍵資訊。
*   **高重要性：**

    *   **3. select_column:** 選擇特定的列，例如選擇球的類型、失誤原因等，是聚焦分析的必要步驟。
    *   **6. value_counts:** 統計各種類型的球、失誤原因等的出現次數，提供直觀的數據。
    *   **7. crosstab:** 交叉分析不同因素之間的關係，例如球的類型和得分球員之間的關係。
*   **中等重要性：**

    *   **9. sort:** 按照時間排序，可以分析比賽的進程。
    *   **10. calculate:** 計算新的

新增重要operations(select_row, select_col, write) 到'selected_operations.json'

In [44]:
import json

# 讀取 filtered_operation.json
with open('filtered_operations.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

# 取得目前最大 number
existing_numbers = [op['number'] for op in data['operations']]
max_number = max(existing_numbers)

# 新增的 operations
new_operations = [
    {
        "number": max_number + 1,
        "name": "select_row",
        "description": "Selects rows based on their row indices."
    },
    {
        "number": max_number + 2,
        "name": "select_col",
        "description": "Selects columns based on their column names."
    },
    {
        "number": max_number + 3,
        "name": "write",
        "description": "If the table is small enough, generates text based on the tables using the LLM; represents the leaf node of the tree."
    }
]

# 將新操作加入原始資料
data['operations'].extend(new_operations)

# 寫回 JSON 檔
with open('selected_operations.json', 'w', encoding='utf-8') as f:
    json.dump(data, f, ensure_ascii=False, indent=2)

# 輸出所有操作的 number 列表
all_numbers = [op['number'] for op in data['operations']]
print(all_numbers)


[1, 2, 3, 4, 5, 6, 7, 8, 9]


# STEP 7

根據Table,得到要執行的操作與參數

In [None]:
import pandas as pd
import json
import google.generativeai as genai
import os

class ContentPlanner:
    def __init__(self, api_key):
        self.api_key = api_key
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel("gemini-2.0-flash")
        
    def generate_operations(self, tables, table_description, operation_description, 
                          operation_history, operation_pool, max_depth=5, max_degree=3,outline_path='main.txt'):
        """
        使用Gemini生成operations和arguments
        """
        
        # 構建完整的提示詞
        prompt = f"""System : You are a content planner for the report. Please follow the outline. Please select candidate Operations and corresponding Arguments from the Operation Pool based on the input Tables and Operation History. These candidate Operations will be the next Operation in the Operation History .

# Requirements
1. Strictly adhere to the requirements .
2. The output must be in English .
3. The output must be based on the input data ; do not hallucinate .
4. The length of Operation History must be less than or equal to {max_depth}.
5. The number of Operations must be less than or equal to {max_degree}.
6. Only select Opertions from the Operation Pool .
7. Arguments must match the format required by the corresponding Operations .
8. Operations & Arguments must follow this format : [ operation_1 ( argument_1 , ...) , operation_2 ( argument_2 , ...) , operation_3 ( argument_3 , ...) , ...]
9. Only output Operations & Arguments !
10. If Table is big or Level is low, it should be more Operations include select_col or select_row not write.
11. If the length of Operation History is short, then more operations or more arguments.
12. Write operations do not need argument.

#outline
{read_text_file(outline_path)}

# Table Description
{table_description}

# Operation Description
{json.dumps(operation_description, indent=2, ensure_ascii=False)}

User : # Test
## Tables
{tables}

## Operation History
{operation_history}

## Operation Pool
{operation_pool}

## Operations & Arguments"""

        try:
            print("正在向Gemini發送請求...")
            response = self.model.generate_content(prompt)
            
            if response.text:
                print("成功獲得Gemini回應")
                return response.text.strip()
            else:
                print("Gemini回應為空")
                return None
                
        except Exception as e:
            print(f"Gemini API請求失敗: {e}")
            return None
        
def read_json_file(file_path):
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return json.load(file)
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return json.load(file)

    
# 設置API密鑰
api_key = os.getenv("GOOGLE_API_KEY")
print("Content Planner for Badminton Game Report")
print("="*50)

# 檢查是否有必要的文件
#required_files = ['filtered_set1.csv', 'filtered_data_description.txt', 'filtered_operations.json']

print("正在載入數據...")

# 讀取CSV檔案
TABLES = pd.read_csv('filtered_set1.csv')
tables_str = TABLES.to_string()
print(f"成功載入CSV: {TABLES.shape[0]} 行, {TABLES.shape[1]} 列")

# 讀取表格描述
TABLE_DESCRIPTION = read_text_file("filtered_data_description.txt")
if not TABLE_DESCRIPTION:
    TABLE_DESCRIPTION = "No table description available"
print(f"載入表格描述: {len(TABLE_DESCRIPTION)} 字符")

# 讀取操作描述
OPERATION_DESCRIPTION = read_json_file("selected_operations.json")
print(f"載入操作描述: {len(OPERATION_DESCRIPTION)} 個項目")

# 設置其他變數
MAX_DEPTH = 5
MAX_DEGREE = 5
OPERATION_HISTORY = ['root(None)']
Level = 0
# 從操作描述中提取操作池
#if 'operations' in OPERATION_DESCRIPTION:
#    OPERATION_POOL = [op['name'] for op in OPERATION_DESCRIPTION['operations']]
#else:
#    OPERATION_POOL = list(OPERATION_DESCRIPTION.keys())
OPERATION_POOL = [op['name'] for op in OPERATION_DESCRIPTION]

print(f"操作池: {OPERATION_POOL}")
print(f"操作歷史: {OPERATION_HISTORY}")
    


# 初始化內容規劃器
planner = ContentPlanner(api_key)

# 生成操作和參數
print("\n開始生成操作和參數...")
operations_and_arguments = planner.generate_operations(
    tables=tables_str,
    table_description=TABLE_DESCRIPTION,
    operation_description=OPERATION_DESCRIPTION,
    operation_history=OPERATION_HISTORY,
    operation_pool=OPERATION_POOL,
    max_depth=MAX_DEPTH,
    max_degree=MAX_DEGREE,
    outline_path='analyze_response.txt'
)
OPERATION_HISTORY = OPERATION_HISTORY.append(operations_and_arguments)
Level +=1

if operations_and_arguments:
    print("\n" + "="*50)
    print("GEMINI 輸出結果:")
    print("="*50)
    print(operations_and_arguments)
    print("="*50)



Content Planner for Badminton Game Report
正在載入數據...
成功載入CSV: 315 行, 9 列
載入表格描述: 494 字符
載入操作描述: 10 個項目
操作池: ['write', 'select_row', 'select_column', 'group_by', 'aggregate', 'value_counts', 'crosstab', 'pivot_table', 'sort', 'calculate']
操作歷史: ['root(None)']

開始生成操作和參數...
正在向Gemini發送請求...
成功獲得Gemini回應

GEMINI 輸出結果:
[select_column(type, lose_reason, getpoint_player), value_counts(type), value_counts(lose_reason)]


In [33]:
print(operations_and_arguments)

[select_column(type, lose_reason, getpoint_player), value_counts(type), value_counts(lose_reason)]


解析LLM response內容

In [36]:
# 原始字符串
test_str = operations_and_arguments

# 提取方括号内的内容
start = test_str.find('[') + 1
end = test_str.rfind(']')
content = test_str[start:end].strip()

elements = []
current = []
stack = 0

# 遍历字符进行解析
for char in content:
    if char == '(':
        stack += 1
        current.append(char)
    elif char == ')':
        stack -= 1
        current.append(char)
    elif char == ',' and stack == 0:
        elements.append(''.join(current).strip())
        current = []
    else:
        current.append(char)

# 添加最后一个元素
if current:
    elements.append(''.join(current).strip())

print(elements)

['select_column(type, lose_reason, getpoint_player)', 'value_counts(type)', 'value_counts(lose_reason)']


# STEP 8

根據欄位型態與'operation_name' 和 'operation_argument'，請LLM撰寫可以執行的操作程式碼

取欄位型態

In [2]:
import pandas as pd
df_copy = pd.read_csv("filtered_set1.csv")
df = df_copy
print(df.dtypes)


Unnamed: 0          int64
rally               int64
time               object
roundscore_A        int64
roundscore_B        int64
player             object
type               object
lose_reason        object
getpoint_player    object
dtype: object


In [15]:
import dspy
import google.generativeai as genai
import pandas as pd
import ast
import re
import os
elements = ['select_column(type, lose_reason, getpoint_player)', 'value_counts(type)', 'value_counts(lose_reason)']

class Gemini(dspy.LM):
    def __init__(self, api_key, model_name="gemini-2.0-flash"):
        self.api_key = api_key
        self.model_name = model_name
        genai.configure(api_key=api_key)
        self._model_instance = genai.GenerativeModel(model_name)
        super().__init__(model=model_name)

    def __call__(self, messages=None, **kwargs):
        if messages is None:
            raise ValueError("Missing 'messages' argument")
        if isinstance(messages, list):
            prompt_text = "".join([msg.get('content', '') for msg in messages])
        else:
            prompt_text = str(messages)
        try:
            response = self._model_instance.generate_content(prompt_text)
            if not response.text:
                raise ValueError("Empty response from Gemini")
            return [{'text': response.text, 'logprobs': None}]
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return [{'text': "⚠️ Gemini API 回應失敗", 'logprobs': None}]

    def basic_request(self, prompt, **kwargs):
        try:
            response = self._model_instance.generate_content(prompt)
            return response.text
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return "⚠️ 無法取得 Gemini 回應"

def setup_gemini_api(api_key):
    lm = Gemini(api_key=api_key)
    dspy.settings.configure(lm=lm)
    return lm

import re
import pandas as pd

class DataFrameOperator:
    def __init__(self, api_key):
        self.lm = setup_gemini_api(api_key)

    def generate_code(self, operation, df_info, df_path):
        prompt = f"""
        你是一個專業的Python資料分析助手。欄位名稱以資料欄位類型提供為主，根據以下要求生成操作DataFrame的程式碼：

        要執行的操作: {operation}

        CSV數據集: {df_path}

        資料欄位類型:
        {df_info}

        生成要求：
        讀取CSV數據集，並存入DataFrame後，使用要執行的操作後，將修改後的DataFrame存入'tmp.csv'，撰寫完整python code.
        切忌每個操作參數都需要使用

        輸出格式：
        ```python
        # 你的程式碼
        ```
        """
        return self.lm.basic_request(prompt)

    def safe_execute(self, code, df):
        try:
            code_block = re.search(r'```python\n(.*?)\n```', code, re.DOTALL)
            if code_block:
                code = code_block.group(1)

            # 寫入暫存 CSV 檔案作為模擬 df.csv 路徑
            df.to_csv("input_tmp.csv", index=False)

            # 建立安全執行環境
            exec_globals = {'pd': pd}
            exec_locals = {}

            # 執行生成的程式碼
            exec(code, exec_globals, exec_locals)

            # 從 tmp.csv 讀取處理後的結果
            result_df = pd.read_csv("tmp.csv")
            return result_df

        except Exception as e:
            print(f"執行錯誤: {str(e)}")
            return df



# 初始化
API_KEY = os.getenv("GOOGLE_API_KEY")
operator = DataFrameOperator(API_KEY)

# 獲取資料資訊
df_info = df.info()
df_path = "filtered_set1.csv"
operation_def = elements[0]
#print(operation)

generated_code = operator.generate_code(
    operation=operation_def,
    df_info=df_info,
    df_path=df_path
)

print("生成的程式碼：")
print(generated_code)

# 執行操作
processed_df = operator.safe_execute(generated_code, df)

print("\n處理結果：")
print(processed_df)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 315 entries, 0 to 314
Data columns (total 9 columns):
 #   Column           Non-Null Count  Dtype 
---  ------           --------------  ----- 
 0   Unnamed: 0       315 non-null    int64 
 1   rally            315 non-null    int64 
 2   time             315 non-null    object
 3   roundscore_A     315 non-null    int64 
 4   roundscore_B     315 non-null    int64 
 5   player           315 non-null    object
 6   type             315 non-null    object
 7   lose_reason      36 non-null     object
 8   getpoint_player  36 non-null     object
dtypes: int64(4), object(5)
memory usage: 22.3+ KB
生成的程式碼：
```python
import pandas as pd

def select_column(df, type_col, lose_reason_col, getpoint_player_col):
  """
  從 DataFrame 中選擇指定的欄位。

  Args:
    df: pandas DataFrame.
    type_col: 類型欄位的名稱 (字串).
    lose_reason_col: 失敗原因欄位的名稱 (字串).
    getpoint_player_col: 得分球員欄位的名稱 (字串).

  Returns:
    一個新的 DataFrame，僅包含指定的欄位。
  """
  return df[[type_col,

In [None]:
import random

class TreeNode:
    def __init__(self, value):
        self.value = value
        self.children = []
        self.level = Level
        self.text = text
        self.table = table
        self.
    def __repr__(self):
        return f"TreeNode({self.value})"

def build_random_tree(current_depth=1, max_depth=5, max_degree=5, value_counter=[0]):
    value_counter[0] += 1
    node = TreeNode(value_counter[0])

    if current_depth >= max_depth or random.random() < 0.3:
        return node  # 葉節點

    degree = random.randint(1, max_degree)
    print(degree)
    for _ in range(degree):
        child = build_random_tree(current_depth + 1, max_depth, max_degree, value_counter)
        node.children.append(child)

    return node

def print_tree(node, level=0):
    print("  " * level + f"- Node({node.value})")
    for child in node.children:
        print_tree(child, level + 1)

# 建立並印出隨機樹
random.seed(42)  # 可重現性
root = build_random_tree()
print_tree(root)


1
2
5
1
5
- Node(1)
  - Node(2)
    - Node(3)
    - Node(4)
      - Node(5)
      - Node(6)
        - Node(7)
      - Node(8)
      - Node(9)
      - Node(10)
        - Node(11)
        - Node(12)
        - Node(13)
        - Node(14)
        - Node(15)


# STEP final

In [None]:
import pandas as pd
import json
import google.generativeai as genai
import os
import dspy
import ast
import re
from typing import List, Dict, Any, Optional
import copy

# ===== 基於參考程式碼的函數 =====
def read_text_file(file_path):
    """讀取文本文件"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except FileNotFoundError:
        return "No file available"
    except Exception as e:
        print(f"讀取文件錯誤: {e}")
        return "Error reading file"

def read_json_file(file_path):
    """讀取JSON文件"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return json.load(file)
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return json.load(file)
    except FileNotFoundError:
        # 返回默認操作集合
        return [
            {"name": "select_column", "description": "選擇特定欄位"},
            {"name": "value_counts", "description": "計算值的頻次"},
            {"name": "groupby", "description": "按欄位分組"},
            {"name": "sort_values", "description": "排序數據"},
            {"name": "filter_rows", "description": "過濾行數據"},
            {"name": "write", "description": "撰寫分析文本"}
        ]

# ===== 樹節點類別 =====
class TreeNode:
    """樹節點類別，包含 children, level, text, table 屬性"""
    def __init__(self, level: int = 0, text: str = "", table: pd.DataFrame = None, operation: str = None):
        self.children: List['TreeNode'] = []
        self.level: int = level
        self.text: str = text
        self.table: pd.DataFrame = table if table is not None else pd.DataFrame()
        self.operation: str = operation
        self.parent: Optional['TreeNode'] = None
        self.operation_history: List[str] = []
    
    def add_child(self, child: 'TreeNode'):
        """添加子節點"""
        child.parent = self
        self.children.append(child)
    
    def is_leaf(self) -> bool:
        """判斷是否為葉節點"""
        return len(self.children) == 0

# ===== 基於參考程式碼的 ContentPlanner =====
class ContentPlanner:
    def __init__(self, api_key):
        self.api_key = api_key
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel("gemini-2.0-flash")
        
    def generate_operations(self, tables, table_description, operation_description, 
                          operation_history, operation_pool, max_depth=5, max_degree=3, outline_path='main.txt'):
        """
        使用Gemini生成operations和arguments
        """
        
        # 構建完整的提示詞
        prompt = f"""System : You are a content planner for the report. Please follow the outline. Please select candidate Operations and corresponding Arguments from the Operation Pool based on the input Tables and Operation History. These candidate Operations will be the next Operation in the Operation History .

# Requirements
1. Strictly adhere to the requirements .
2. The output must be in English .
3. The output must be based on the input data ; do not hallucinate .
4. The length of Operation History must be less than or equal to {max_depth}.
5. The number of Operations must be less than or equal to {max_degree}.
6. Only select Opertions from the Operation Pool .
7. Arguments must match the format required by the corresponding Operations .
8. Operations & Arguments must follow this format : [ operation_1 ( argument_1 , ...) , operation_2 ( argument_2 , ...) , operation_3 ( argument_3 , ...) , ...]
9. Only output Operations & Arguments !
10. If Table is big or Level is low, it should be more Operations include select_col or select_row not write.
11. If the length of Operation History is short, then more operations or more arguments.
12. Write operations do not need argument.

#outline
{read_text_file(outline_path) if os.path.exists(outline_path) else "Generate comprehensive data analysis"}

# Table Description
{table_description}

# Operation Description
{json.dumps(operation_description, indent=2, ensure_ascii=False)}

User : # Test
## Tables
{tables}

## Operation History
{operation_history}

## Operation Pool
{operation_pool}

## Operations & Arguments"""

        try:
            print("正在向Gemini發送請求...")
            response = self.model.generate_content(prompt)
            
            if response.text:
                print("成功獲得Gemini回應")
                return self.parse_operations(response.text.strip())
            else:
                print("Gemini回應為空")
                return []
                
        except Exception as e:
            print(f"Gemini API請求失敗: {e}")
            return []
    
    def parse_operations(self, response_text):
        """解析 Gemini 回應的操作列表"""
        try:
            # 尋找方括號內的內容
            bracket_match = re.search(r'\[(.*?)\]', response_text, re.DOTALL)
            if bracket_match:
                operations_str = bracket_match.group(1)
                # 分割操作
                operations = []
                # 使用正則表達式分割操作
                op_pattern = r'([a-zA-Z_]+\([^)]*\))'
                matches = re.findall(op_pattern, operations_str)
                if matches:
                    return matches
                else:
                    # 如果沒有匹配，嘗試簡單分割
                    ops = [op.strip().strip(',') for op in operations_str.split(',')]
                    return [op for op in ops if op and op != '']
            else:
                # 如果沒有方括號，按行分割
                lines = response_text.split('\n')
                operations = []
                for line in lines:
                    line = line.strip()
                    if line and not line.startswith('#') and not line.startswith('System'):
                        operations.append(line)
                return operations
        except Exception as e:
            print(f"解析操作失敗: {e}")
            return []

# ===== 基於參考程式碼的 Gemini 和 DataFrameOperator =====
class Gemini(dspy.LM):
    def __init__(self, api_key, model_name="gemini-2.0-flash"):
        self.api_key = api_key
        self.model_name = model_name
        genai.configure(api_key=api_key)
        self._model_instance = genai.GenerativeModel(model_name)
        super().__init__(model=model_name)

    def __call__(self, messages=None, **kwargs):
        if messages is None:
            raise ValueError("Missing 'messages' argument")
        if isinstance(messages, list):
            prompt_text = "".join([msg.get('content', '') for msg in messages])
        else:
            prompt_text = str(messages)
        try:
            response = self._model_instance.generate_content(prompt_text)
            if not response.text:
                raise ValueError("Empty response from Gemini")
            return [{'text': response.text, 'logprobs': None}]
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return [{'text': "⚠️ Gemini API 回應失敗", 'logprobs': None}]

    def basic_request(self, prompt, **kwargs):
        try:
            response = self._model_instance.generate_content(prompt)
            return response.text
        except Exception as e:
            print(f"Error from Gemini model: {e}")
            return "⚠️ 無法取得 Gemini 回應"

def setup_gemini_api(api_key):
    lm = Gemini(api_key=api_key)
    dspy.settings.configure(lm=lm)
    return lm

class DataFrameOperator:
    def __init__(self, api_key):
        self.lm = setup_gemini_api(api_key)

    def generate_code(self, operation, df_info, df_path="input_tmp.csv"):
        prompt = f"""
        你是一個專業的Python資料分析助手。欄位名稱以資料欄位類型提供為主，根據以下要求生成操作DataFrame的程式碼：

        要執行的操作: {operation}

        CSV數據集: {df_path}

        資料欄位類型:
        {df_info}

        生成要求：
        讀取CSV數據集，並存入DataFrame後，使用要執行的操作後，將修改後的DataFrame存入'tmp.csv'，撰寫完整python code.
        切忌每個操作參數都需要使用

        輸出格式：
        ```python
        # 你的程式碼
        ```
        """
        return self.lm.basic_request(prompt)

    def safe_execute(self, code, df):
        try:
            code_block = re.search(r'```python\n(.*?)\n```', code, re.DOTALL)
            if code_block:
                code = code_block.group(1)

            # 寫入暫存 CSV 檔案作為模擬 df.csv 路徑
            df.to_csv("input_tmp.csv", index=False)

            # 建立安全執行環境
            exec_globals = {'pd': pd}
            exec_locals = {}

            # 執行生成的程式碼
            exec(code, exec_globals, exec_locals)

            # 從 tmp.csv 讀取處理後的結果
            if os.path.exists("tmp.csv"):
                result_df = pd.read_csv("tmp.csv")
                return result_df
            else:
                return df

        except Exception as e:
            print(f"執行錯誤: {str(e)}")
            return df

# ===== 文本生成器 =====
# 修改 TextGenerator 類別：初始化時接收 table_description
import time  # 確保導入 time 模組

class TextGenerator:
    def __init__(self, api_key, table_description=""):
        self.api_key = api_key
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel("gemini-2.0-flash")
        self.table_description = table_description

    def _retry_generate(self, prompt, max_retries=3, delay_seconds=30):
        for attempt in range(max_retries):
            try:
                response = self.model.generate_content(prompt)
                if response.text:
                    return response.text.strip()
            except Exception as e:
                err = str(e)
                print(f"Gemini 回應失敗: {err}")
                if "429" in err:
                    print(f"已達配額限制，等待 {delay_seconds} 秒後重試 ({attempt+1}/{max_retries})...")
                    time.sleep(delay_seconds)
                else:
                    break
        return "⚠️ 寫作請求失敗：API 限制或其他錯誤"

    def generate_text_for_write_operation(self, table: pd.DataFrame, operation_history: List[str]) -> str:
        table_str = table.head(10).to_string() if len(table) > 10 else table.to_string()
        WRITE_TOKENS = 50
        TABLE_FORMAT = "Pandas DataFrame as plain text"

        prompt = f"""
    System :
    You are a content writer for the badminton game report .
    Please write the Report based on the input Table .

    # Requirements
    1. Strictly adhere to the requirements .
    2. The output must be in English .
    3. The output must be based on the input data ; do not hallucinate .
    4. The Table format is {TABLE_FORMAT}.
    5. The Report can only describe the content included in the Tables and cannot describe anything not included in the Tables .
    6. The Report must consist of only one paragraph .
    7. The number of tokens in the Report must be within {WRITE_TOKENS}.

    # Table Description
    {self.table_description}

    User :
    # Test
    ## Tables
    {table_str}
    ## Report
    """
        return self._retry_generate(prompt)

    def merge_child_texts(self, child_texts: List[str], parent_operation: str) -> str:
        if not child_texts:
            return ""

        GENERATING_TOKENS = 100
        reports_str = "\n".join([f"- {txt}" for txt in child_texts])
        prompt = f"""
    System :
    You are a content generator for the badminton game report .
    Please merge and rewrite a New Report based on the input Reports .

    # Requirements
    1. Strictly adhere to the requirements .
    2. The output must be in English .
    3. The output must be based on the input data ; do not hallucinate .
    4. The New Report must include all the content from the input Reports ; do not omit any information .
    5. The New Report must follow the order of the input Reports .
    6. The number of tokens in the New Report must be within {GENERATING_TOKENS}.

    User :
    # Test
    ## Reports
    {reports_str}
    ## New Report
    """
        return self._retry_generate(prompt)

# 修改 TreeOfReport 類別中 TextGenerator 的初始化邏輯
class TreeOfReport:
    def __init__(self, api_key: str, max_depth: int = 5, max_degree: int = 5):
        self.api_key = api_key
        self.max_depth = max_depth
        self.max_degree = max_degree

        # 載入配置檔案
        self.load_configurations()

        # 初始化組件，注意 TextGenerator 傳入 table_description
        self.content_planner = ContentPlanner(api_key)
        self.df_operator = DataFrameOperator(api_key)
        self.text_generator = TextGenerator(api_key, table_description=self.table_description)

    def load_configurations(self):
        self.table_description = read_text_file("filtered_data_description.txt")
        if not self.table_description or self.table_description == "No file available":
            self.table_description = "數據分析表格，包含各種欄位用於分析"

        self.operation_description = read_json_file("selected_operations.json")
        if isinstance(self.operation_description, list):
            self.operation_pool = [op['name'] for op in self.operation_description]
        else:
            self.operation_pool = list(self.operation_description.keys())

        print(f"載入操作池: {self.operation_pool}")

    
    def build_tree(self, root_table: pd.DataFrame) -> TreeNode:
        """建構報告樹"""
        root = TreeNode(level=0, text="資料分析報告", table=root_table, operation="root(None)")
        root.operation_history = ['root(None)']
        queue = [root]

        while queue:
            current_node = queue.pop(0)

            if current_node.operation.lower().startswith('write'):
                continue

            if current_node.level >= self.max_depth:
                write_node = self.create_child_node(current_node, 'write()')
                if write_node:
                    current_node.add_child(write_node)
                continue

            print(f"\n處理節點 - Level: {current_node.level}, Operation: {current_node.operation}")

            tables_str = current_node.table.to_string()
            operations = self.content_planner.generate_operations(
                tables=tables_str,
                table_description=self.table_description,
                operation_description=self.operation_description,
                operation_history=current_node.operation_history,
                operation_pool=self.operation_pool,
                max_depth=self.max_depth,
                max_degree=self.max_degree
            )

            print(f"生成操作: {operations}")

            for operation in operations[:self.max_degree]:
                if operation.strip():
                    child_node = self.create_child_node(current_node, operation)
                    if child_node:
                        current_node.add_child(child_node)
                        queue.append(child_node)

        self.generate_all_texts(root)
        return root
    
    def create_child_node(self, parent: TreeNode, operation: str) -> Optional[TreeNode]:
        """創建子節點"""
        try:
            # 建立新的操作歷史
            new_operation_history = parent.operation_history + [operation]
            
            # 檢查是否為 write 操作
            if operation.lower().startswith('write'):
                # Write 操作：生成文本，表格保持不變
                text = self.text_generator.generate_text_for_write_operation(
                    parent.table,
                    new_operation_history
                )
                child = TreeNode(
                    level=parent.level + 1,
                    text=text,
                    table=parent.table.copy(),
                    operation=operation
                )
                child.operation_history = new_operation_history
                print(f"創建 write 節點: {operation}")
                return child
            else:
                # 其他操作：執行數據操作
                df_info = f"Shape: {parent.table.shape}\nColumns: {list(parent.table.columns)}\nData types:\n{parent.table.dtypes.to_string()}"
                code = self.df_operator.generate_code(operation, df_info)
                
                if code:
                    result_df = self.df_operator.safe_execute(code, parent.table)
                    child = TreeNode(
                        level=parent.level + 1,
                        text="",
                        table=result_df,
                        operation=operation
                    )
                    child.operation_history = new_operation_history
                    print(f"創建數據操作節點: {operation}, 結果形狀: {result_df.shape}")
                    return child
                else:
                    print(f"無法生成操作代碼: {operation}")
                    return None
        
        except Exception as e:
            print(f"創建子節點失敗: {e}")
            return None
    
    def generate_all_texts(self, node: TreeNode):
        """遞歸生成所有節點的文本"""
        # 先處理子節點
        for child in node.children:
            self.generate_all_texts(child)
        
        # 如果是葉節點且沒有文本（非 write 操作）
        if node.is_leaf() and not node.text and node.operation and not node.operation.lower().startswith('write'):
            node.text = self.text_generator.generate_text_for_write_operation(
                node.table, 
                node.operation_history
            )
        # 如果有子節點，合併子節點的文本
        elif node.children:
            child_texts = [child.text for child in node.children if child.text.strip()]
            if child_texts:
                merged_text = self.text_generator.merge_child_texts(
                    child_texts, 
                    node.operation or "root"
                )
                if node.text:
                    node.text = node.text + "\n\n" + merged_text
                else:
                    node.text = merged_text
        print(f'節點資訊: {node.text}')
        
    def generate_report(self, node: TreeNode, level: int = 0) -> str:
            if node.level == 0:
                prompt = f"""
                根據以下分析總結，請撰寫一篇賽事數據分析報導，包含：起、承、轉、合，提供全面深入的分析。
                請用繁體中文撰寫，保持邏輯清晰，資訊準確。

                分析總結:
                {node.text}
                """
                final_text = self.text_generator._retry_generate(prompt)
                with open("tree_of_report.txt", "w", encoding="utf-8") as f:
                    f.write(final_text)
                return final_text
            else:
                print(f'generate report from not root')
                indent = "  " * level
                report = f"{indent}{'#' * (level + 1)} {node.operation or 'Root'}\n\n"

                if node.text:
                    report += f"{indent}{node.text}\n\n"

                if node.table is not None and not node.table.empty and level < 2:
                    report += f"{indent}**資料摘要:** Shape {node.table.shape}\n"
                    if len(node.table) <= 10:
                        report += f"{indent}```\n{node.table.to_string()}\n{indent}```\n\n"
                    else:
                        report += f"{indent}```\n{node.table.head().to_string()}\n{indent}```\n\n"

                for child in node.children:
                    report += self.generate_report(child, level + 1)

                return report



# ===== 主程序 =====
def main():
    """主函數 - 基於參考程式碼結構"""
    
    # 設置API密鑰
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        print("請設置 GOOGLE_API_KEY 環境變數")
        return
    
    print("Tree-of-Report for Data Analysis")
    print("="*50)
    
    print("正在載入數據...")
    
    # 讀取CSV檔案
    try:
        TABLES = pd.read_csv('filtered_set1.csv')
        print(f"成功載入CSV: {TABLES.shape[0]} 行, {TABLES.shape[1]} 列")
    except FileNotFoundError:
        print("找不到 filtered_set1.csv，使用示例數據")
        # 創建示例數據
        TABLES = pd.DataFrame({
            'type': ['A', 'B', 'A', 'C', 'B', 'A', 'C', 'B'],
            'lose_reason': ['net', 'out', 'net', 'long', 'net', 'out', 'long', 'net'],
            'getpoint_player': ['Player1', 'Player2', 'Player1', 'Player2', 'Player1', 'Player2', 'Player1', 'Player2'],
            'score': [1, 2, 1, 3, 2, 1, 4, 2]
        })
    
    # 設置參數
    MAX_DEPTH = 3  # 降低深度以便測試
    MAX_DEGREE = 3  # 降低分支度以便測試
    
    print(f"最大深度: {MAX_DEPTH}")
    print(f"最大分支度: {MAX_DEGREE}")
    
    # 初始化 Tree-of-Report
    tree_report = TreeOfReport(api_key, max_depth=MAX_DEPTH, max_degree=MAX_DEGREE)
    
    # 建構報告樹
    print("\n開始建構報告樹...")
    root = tree_report.build_tree(TABLES)
    
    # 生成最終報告
    print("\n生成最終報告...")
    final_report = tree_report.generate_report(root)
    
    # 輸出報告
    print("\n" + "="*50)
    print("TREE-OF-REPORT 最終報告")
    print("="*50)
    print(final_report)
    
    # 儲存報告
    with open('tree_of_report.md', 'w', encoding='utf-8') as f:
        f.write("# Tree-of-Report 數據分析報告\n\n")
        f.write(final_report)
    
    print("報告已儲存至 tree_of_report.md")
    
    # 清理暫存檔案
    for temp_file in ['input_tmp.csv', 'tmp.csv']:
        if os.path.exists(temp_file):
            os.remove(temp_file)

if __name__ == "__main__":
    main()

Tree-of-Report for Data Analysis
正在載入數據...
成功載入CSV: 315 行, 9 列
最大深度: 3
最大分支度: 3
載入操作池: ['write', 'select_row', 'select_column', 'group_by', 'aggregate', 'value_counts', 'crosstab', 'pivot_table', 'sort', 'calculate']

開始建構報告樹...

處理節點 - Level: 0, Operation: root(None)
正在向Gemini發送請求...
成功獲得Gemini回應
生成操作: ['select_column(roundscore_A, roundscore_B)', 'value_counts(roundscore_A)', 'value_counts(roundscore_B)']
創建數據操作節點: select_column(roundscore_A, roundscore_B), 結果形狀: (315, 2)
創建數據操作節點: value_counts(roundscore_A), 結果形狀: (315, 9)
創建數據操作節點: value_counts(roundscore_B), 結果形狀: (16, 2)

處理節點 - Level: 1, Operation: select_column(roundscore_A, roundscore_B)
正在向Gemini發送請求...
成功獲得Gemini回應
生成操作: ['value_counts(roundscore_A)', 'value_counts(roundscore_B)']
已成功將修改後的 DataFrame 儲存到 'tmp.csv'
創建數據操作節點: value_counts(roundscore_A), 結果形狀: (21, 2)
成功將 value_counts 結果儲存到 'tmp.csv'
創建數據操作節點: value_counts(roundscore_B), 結果形狀: (16, 2)

處理節點 - Level: 1, Operation: value_counts(roundscore_A)
正在向Gemini發送請求...
成功獲得G

In [None]:
import pandas as pd
import json
import google.generativeai as genai
import os
import dspy
import ast
import re
from typing import List, Dict, Any, Optional, Set
import copy
import hashlib
import logging
from datetime import datetime
import sys
import builtins
# 設置日誌
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# ===== 基於參考程式碼的函數 =====
def read_text_file(file_path):
    """讀取文本文件"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except FileNotFoundError:
        return "No file available"
    except Exception as e:
        logger.error(f"讀取文件錯誤: {e}")
        return "Error reading file"

def read_json_file(file_path):
    """讀取JSON文件"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return json.load(file)
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return json.load(file)
    except FileNotFoundError:
        # 返回默認操作集合
        return [
            {"name": "select_column", "description": "選擇特定欄位"},
            {"name": "value_counts", "description": "計算值的頻次"},
            {"name": "groupby", "description": "按欄位分組"},
            {"name": "sort_values", "description": "排序數據"},
            {"name": "filter_rows", "description": "過濾行數據"},
            {"name": "write", "description": "撰寫分析文本"}
        ]

# ===== 改進的樹節點類別 =====
class TreeNode:
    """改進的樹節點類別，增加語意驗證和追蹤功能"""
    def __init__(self, level: int = 0, text: str = "", table: pd.DataFrame = None, operation: str = None):
        self.children: List['TreeNode'] = []
        self.level: int = level
        self.text: str = text
        self.table: pd.DataFrame = table if table is not None else pd.DataFrame()
        self.operation: str = operation
        self.parent: Optional['TreeNode'] = None
        self.operation_history: List[str] = []
        
        # 新增屬性用於改進功能
        self.node_id: str = self._generate_node_id()
        self.created_at: datetime = datetime.now()
        self.validation_errors: List[str] = []
        self.table_hash: str = self._calculate_table_hash()
        self.semantic_score: float = 0.0
        
    def _generate_node_id(self) -> str:
        """生成唯一節點ID"""
        content = f"{self.level}_{self.operation}_{datetime.now().isoformat()}"
        return hashlib.md5(content.encode()).hexdigest()[:8]
        
    def _calculate_table_hash(self) -> str:
        """計算表格內容的哈希值，用於檢測重複"""
        if self.table.empty:
            return ""
        try:
            return hashlib.md5(str(self.table.values.tobytes()).encode()).hexdigest()[:8]
        except:
            return ""
    
    def add_child(self, child: 'TreeNode'):
        """添加子節點並進行驗證"""
        if self._validate_child(child):
            child.parent = self
            self.children.append(child)
            logger.info(f"添加子節點: {child.node_id} to {self.node_id}")
        else:
            logger.warning(f"子節點驗證失敗: {child.validation_errors}")
    
    def _validate_child(self, child: 'TreeNode') -> bool:
        """驗證子節點的合理性"""
        errors = []
        
        # 檢查是否有重複的表格狀態
        if child.table_hash and child.table_hash == self.table_hash:
            if not child.operation.lower().startswith('write'):
                errors.append("表格內容未發生變化但非寫作操作")
        
        # 檢查操作是否邏輯合理
        if self._is_redundant_operation(child.operation):
            errors.append(f"檢測到冗餘操作: {child.operation}")
        
        child.validation_errors = errors
        return len(errors) == 0
    
    def _is_redundant_operation(self, operation: str) -> bool:
        """檢查操作是否冗餘"""
        if len(self.operation_history) < 2:
            return False
            
        # 檢查是否有相同操作在近期歷史中
        recent_ops = self.operation_history[-3:]  # 檢查最近3個操作
        op_name = operation.split('(')[0].lower()
        
        for hist_op in recent_ops:
            if hist_op.split('(')[0].lower() == op_name:
                return True
        return False
    
    def is_leaf(self) -> bool:
        """判斷是否為葉節點"""
        return len(self.children) == 0
    
    def to_dict(self) -> Dict[str, Any]:
        """將節點轉換為字典格式，用於可視化"""
        return {
            "node_id": self.node_id,
            "level": self.level,
            "operation": self.operation,
            "text_preview": self.text[:100] + "..." if len(self.text) > 100 else self.text,
            "table_shape": list(self.table.shape) if not self.table.empty else [0, 0],
            "table_columns": list(self.table.columns) if not self.table.empty else [],
            "children_count": len(self.children),
            "validation_errors": self.validation_errors,
            "semantic_score": self.semantic_score,
            "created_at": self.created_at.isoformat(),
            "table_hash": self.table_hash
        }

# ===== 改進的操作解析器 =====
class OperationParser:
    """專門負責解析和驗證操作的類別"""
    
    def __init__(self):
        self.valid_operations = {
            'select_column', 'select_row',  'sort', 'calculate',
            'group_by', 'value_counts', 'aggregate', 'crosstab','pivot_table', 'write'
        }
        
    def parse_operations(self, response_text: str) -> List[Dict[str, Any]]:
        """改進的操作解析，返回結構化結果"""
        try:
            parsed_operations = []
            
            # 多種解析策略
            operations = self._extract_operations_multiple_strategies(response_text)
            
            for op_str in operations:
                parsed_op = self._parse_single_operation(op_str)
                if parsed_op and self._validate_operation(parsed_op):
                    parsed_operations.append(parsed_op)
                else:
                    logger.warning(f"無效操作被忽略: {op_str}")
            
            return parsed_operations[:5]  # 限制最多5個操作
            
        except Exception as e:
            logger.error(f"解析操作失敗: {e}")
            return []
    
    def _extract_operations_multiple_strategies(self, text: str) -> List[str]:
        """使用多種策略提取操作"""
        operations = []
        
        # 策略1: 尋找方括號內容
        bracket_match = re.search(r'\[(.*?)\]', text, re.DOTALL)
        if bracket_match:
            content = bracket_match.group(1)
            # 使用正則提取函數調用格式
            pattern = r'([a-zA-Z_]+\([^)]*\))'
            ops = re.findall(pattern, content)
            operations.extend(ops)
        
        # 策略2: 逐行解析
        if not operations:
            lines = text.split('\n')
            for line in lines:
                line = line.strip()
                if line and not line.startswith('#') and '(' in line and ')' in line:
                    operations.append(line)
        
        # 策略3: 逗號分割
        if not operations:
            parts = text.replace('[', '').replace(']', '').split(',')
            for part in parts:
                part = part.strip()
                if part and '(' in part:
                    operations.append(part)
        
        return operations
    
    def _parse_single_operation(self, op_str: str) -> Optional[Dict[str, Any]]:
        """解析單個操作字符串"""
        try:
            # 移除多餘的字符
            op_str = op_str.strip().rstrip(',').strip()
            
            # 提取操作名稱和參數
            if '(' not in op_str:
                return {"name": op_str, "args": [], "raw": op_str}
            
            name_part = op_str.split('(')[0].strip()
            args_part = op_str[op_str.find('(')+1:op_str.rfind(')')].strip()
            
            # 解析參數
            args = []
            if args_part:
                # 簡單的參數分割（可以進一步改進）
                for arg in args_part.split(','):
                    arg = arg.strip().strip('\'"')
                    if arg:
                        args.append(arg)
            
            return {
                "name": name_part.lower(),
                "args": args,
                "raw": op_str
            }
            
        except Exception as e:
            logger.error(f"解析操作 '{op_str}' 失敗: {e}")
            return None
    
    def _validate_operation(self, operation: Dict[str, Any]) -> bool:
        """驗證操作的有效性"""
        name = operation.get("name", "").lower()
        
        # 檢查操作名稱是否有效
        if name not in self.valid_operations:
            logger.warning(f"未知操作: {name}")
            return False
        
        # 檢查特定操作的參數
        args = operation.get("args", [])
        
        if name in ['select_column', 'sort_values'] and not args:
            logger.warning(f"{name} 操作需要參數")
            return False
        
        return True

# ===== 改進的內容規劃器 =====
class ContentPlanner:
    def __init__(self, api_key):
        self.api_key = api_key
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel("gemini-2.0-flash")
        self.parser = OperationParser()
        
    def generate_operations(self, tables, table_description, operation_description, 
                          operation_history, operation_pool, max_depth=5, max_degree=3, outline_path='main.txt'):
        """
        改進的操作生成，加入重複檢測和語意驗證
        """
        
        # 檢測近期操作，避免重複
        recent_operations = self._extract_recent_operations(operation_history)
        
        # 構建改進的提示詞
        prompt = f"""System : You are a content planner for the report. Please follow the outline. Please select candidate Operations and corresponding Arguments from the Operation Pool based on the input Tables and Operation History. These candidate Operations will be the next Operation in the Operation History .

# Requirements
1. Strictly adhere to the requirements .
2. The output must be in English .
3. The output must be based on the input data ; do not hallucinate .
4. The length of Operation History must be less than or equal to {max_depth}.
5. The number of Operations must be less than or equal to {max_degree}  and more than zero.
6. Only select Opertions from the Operation Pool .
7. Arguments must match the format required by the corresponding Operations .
8. Operations & Arguments must follow this format : [ operation_1 ( argument_1 , ...) , operation_2 ( argument_2 , ...) , operation_3 ( argument_3 , ...) , ...]
9. Only output Operations & Arguments !
10. If Table is big or Level is low, it should be more Operations include select_col or select_row not write.
11. If the length of Operation History is short, then more operations or more arguments.
12. Write operations do not need argument.
13. AVOID repeating recent operations: {recent_operations}
14. Prioritize operations that will meaningfully transform the data.
15. Avoid give the arguments that not match by the operation.

#outline
{read_text_file(outline_path) if os.path.exists(outline_path) else "Generate comprehensive data analysis"}

# Table Description
{table_description}

# Operation Description
{json.dumps(operation_description, indent=2, ensure_ascii=False)}

User : # Test
## Tables
{tables}

## Operation History
{operation_history}

## Operation Pool
{operation_pool}

## Operations & Arguments"""

        try:
            logger.info("正在向Gemini發送請求...")
            response = self.model.generate_content(prompt)
            
            if response.text:
                logger.info("成功獲得Gemini回應")
                parsed_ops = self.parser.parse_operations(response.text.strip())
                return [op["raw"] for op in parsed_ops]  # 返回原始字符串格式
            else:
                logger.warning("Gemini回應為空")
                return []
                
        except Exception as e:
            logger.error(f"Gemini API請求失敗: {e}")
            return []
    
    def _extract_recent_operations(self, operation_history: List[str]) -> List[str]:
        """提取最近的操作名稱"""
        recent = []
        for op in operation_history[-3:]:  # 最近3個操作
            if '(' in op:
                name = op.split('(')[0].strip()
                recent.append(name)
        return recent

# ===== 安全的DataFrame操作器 =====
class SafeDataFrameOperator:
    """安全的DataFrame操作器，使用AST驗證而非直接exec"""
    
    def __init__(self, api_key):
        self.api_key = api_key
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel("gemini-2.0-flash")
        self.allowed_modules = {'pandas', 'numpy', 're'}
        self.allowed_functions = {
            'pd.read_csv', 'pd.DataFrame', 'df.head', 'df.tail', 'df.sort_values',
            'df.groupby', 'df.filter', 'df.select', 'df.drop', 'df.fillna',
            'df.to_csv', 'df.value_counts', 'df.describe', 'df.info'
        }

    def generate_code(self, operation, df_info, df_path="input_tmp.csv"):
        prompt = f"""
        你是一個專業的Python資料分析助手。欄位名稱以資料欄位類型提供為主，根據以下要求生成操作DataFrame的程式碼：

        要執行的操作: {operation}

        CSV數據集: {df_path}

        資料欄位類型:
        {df_info}

        生成要求：
        1. 讀取CSV數據集，並存入DataFrame後，使用要執行的操作後，將修改後的DataFrame存入'tmp.csv'
        2. 只使用pandas基本操作，避免複雜的自定義函數
        3. 確保代碼安全，不包含文件系統操作（除了指定的CSV讀寫）
        4. 撰寫完整python code，包含錯誤處理

        輸出格式：
        ```python
        # 你的程式碼
        ```
        """
        return self._retry_generate(prompt)

    def _retry_generate(self, prompt, max_retries=2):
        """帶重試的生成請求"""
        for attempt in range(max_retries):
            try:
                response = self.model.generate_content(prompt)
                if response.text:
                    return response.text.strip()
            except Exception as e:
                logger.warning(f"生成代碼失敗 (嘗試 {attempt+1}/{max_retries}): {e}")
                if attempt < max_retries - 1:
                    import time
                    time.sleep(1)
        return ""

    def safe_execute(self, code: str, df: pd.DataFrame) -> pd.DataFrame:
        """安全執行生成的代碼"""
        try:
            # 提取代碼塊
            code_block = re.search(r'```python\n(.*?)\n```', code, re.DOTALL)
            #print(f'python code: {code_block}')
            if code_block:
                code = code_block.group(1)

            # AST安全驗證
            if not self._validate_code_safety(code):
                logger.error("代碼安全驗證失敗")
                return df

            # 寫入暫存 CSV 檔案
            df.to_csv("input_tmp.csv", index=False)

            allowed_builtin_names = [
                'int', 'float', 'str', 'bool', 'list', 'dict', 'set', 'tuple',
                'len', 'range', 'enumerate', 'zip', 'min', 'max', 'sum', 'abs',
                'print',
                'Exception', 'TypeError', 'ValueError', 'KeyError', 'IndexError',
                'FileNotFoundError', 'ZeroDivisionError', 'AttributeError', 'ImportError',
                '__import__'
            ]

            safe_globals = {
                'pd': pd,
                '__name__': '__main__',
                '__builtins__': {name: getattr(builtins, name) for name in allowed_builtin_names}
            }

            safe_locals = {}

            # 執行代碼
            exec(code, safe_globals, safe_locals)

            # 讀取結果
            if os.path.exists("tmp.csv"):
                result_df = pd.read_csv("tmp.csv")
                logger.info(f"操作成功，結果形狀: {result_df.shape}")
                return result_df
            else:
                logger.warning("未生成結果文件，返回原始DataFrame")
                return df

        except Exception as e:
            error_msg = f"執行錯誤: {str(e)}"
            print(error_msg)
            print("錯誤代碼如下：\n" + "-" * 30)
            print(code)  # ✅ 輸出造成錯誤的程式碼
            print("-" * 30)
            logger.error(error_msg)
            sys.exit(1)



    def _validate_code_safety(self, code: str) -> bool:
        """使用AST驗證代碼安全性"""
        try:
            tree = ast.parse(code)
            
            for node in ast.walk(tree):
                # 檢查危險的函數調用
                if isinstance(node, ast.Call):
                    if isinstance(node.func, ast.Name):
                        func_name = node.func.id
                        if func_name in ['exec', 'eval', 'compile', '__import__', 'open']:
                            logger.error(f"檢測到危險函數: {func_name}")
                            return False
                
                # 檢查文件操作（除了允許的CSV操作）
                if isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute):
                    if hasattr(node.func, 'attr'):
                        attr_name = node.func.attr
                        if attr_name in ['system', 'popen', 'subprocess']:
                            logger.error(f"檢測到系統調用: {attr_name}")
                            return False
                
                # 檢查導入語句
                if isinstance(node, ast.Import):
                    for alias in node.names:
                        if alias.name not in self.allowed_modules:
                            logger.error(f"檢測到不允許的模組導入: {alias.name}")
                            return False
            
            return True
            
        except SyntaxError as e:
            logger.error(f"代碼語法錯誤: {e}")
            return False
        except Exception as e:
            logger.error(f"AST驗證失敗: {e}")
            return False

# ===== 文本生成器 =====
import time

class TextGenerator:
    def __init__(self, api_key, table_description=""):
        self.api_key = api_key
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel("gemini-2.0-flash")
        self.table_description = table_description

    def extract_highlights_from_table(self, table: pd.DataFrame) -> str:
        try:
            if 'lose_reason' in table.columns:
                top_reason = table['lose_reason'].value_counts().idxmax()
            else:
                top_reason = "無資料"
            if 'getpoint_player' in table.columns:
                top_player = table['getpoint_player'].value_counts().idxmax()
            else:
                top_player = "未知球員"
            return f"最多失分原因為「{top_reason}」，得分最多的是 {top_player}。"
        except:
            return ""

    def extract_table_features(self, table: pd.DataFrame) -> str:
        summary = []
        for col in table.columns:
            dtype = str(table[col].dtype)
            line = f"欄位「{col}」類型：{dtype}"

            # 顯示常見值僅限類別型欄位
            if table[col].nunique() <= 10 or dtype == 'object' or pd.api.types.is_categorical_dtype(table[col]):
                top_values = table[col].value_counts().head(3).to_dict()
                line += f"，常見值：{list(top_values.keys())}"
            summary.append(line)
        return "\n".join(summary)

    def _retry_generate(self, prompt, max_retries=3, delay_seconds=30):
        for attempt in range(max_retries):
            try:
                response = self.model.generate_content(prompt)
                if response.text:
                    return response.text.strip()
            except Exception as e:
                err = str(e)
                logger.error(f"Gemini 回應失敗: {err}")
                if "429" in err:
                    logger.info(f"已達配額限制，等待 {delay_seconds} 秒後重試 ({attempt+1}/{max_retries})...")
                    time.sleep(delay_seconds)
                else:
                    break
        return "⚠️ 寫作請求失敗：API 限制或其他錯誤"

    def generate_text_for_write_operation(self, table: pd.DataFrame, operation_history: List[str]) -> str:
        table_str = table.to_string()
        WRITE_TOKENS = 50
        TABLE_FORMAT = "Pandas DataFrame as plain text"
        highlight_summary = self.extract_highlights_from_table(table)
        table_feature_summary = self.extract_table_features(table)

        prompt = f"""
System :
You are a professional content writer for the badminton game report .
Please write the Report based on the input Table, just pick one or two lightspots.

# Requirements
1. Strictly adhere to the requirements .
2. The output must be in 中文 .
3. The output must be based on the input data ; do not hallucinate .
4. The Table format is {TABLE_FORMAT}.
5. The Report can only describe the content included in the Tables and cannot describe anything not included in the Tables .
6. The Report must consist of only one paragraph .
7. The number of tokens in the Report must be within {WRITE_TOKENS}.
8. 請專注描述得分與失分模式、關鍵欄位趨勢或球員亮點。
9. 請模仿比賽轉播員或教練的語氣描述，句式自然、有節奏感。
10. 請特別觀察球種之間的連續轉換，例如 放小球 接 殺球 等，找出其中有效得分或不尋常的組合並描述。

# Highlights Summary
{highlight_summary}

# Table Features
{table_feature_summary}

# Table Description
{self.table_description}

User :
# Test
## Tables
{table_str}
## Report
"""
        return self._retry_generate(prompt)

    def merge_child_texts(self, child_texts: List[str], parent_operation: str) -> str:
        if not child_texts:
            return ""

        GENERATING_TOKENS = 100
        reports_str = "\n".join([f"- {txt}" for txt in child_texts])
        prompt = f"""
System :
You are a content generator for the badminton game report .
Please merge and rewrite a New Report based on the input Reports .

# Requirements
1. Strictly adhere to the requirements .
2. The output must be in 中文 .
3. The output must be based on the input data ; do not hallucinate .
4. The New Report must include all the content from the input Reports ; do not omit any information .
5. The New Report must follow the order of the input Reports .
6. The number of tokens in the New Report must be within {GENERATING_TOKENS}.
7. 請依序整合每段內容，形成結構清晰的段落，包括亮點、失誤模式與球員貢獻。

User :
# Test
## Reports
{reports_str}
## New Report
"""
        return self._retry_generate(prompt)

# ===== OperationParser._validate_operation 強化參數驗證（補入 df 欄位比對） =====
def validate_operation_with_columns(operation: Dict[str, Any], df_columns: List[str]) -> bool:
    name = operation.get("name", "").lower()
    args = operation.get("args", [])

    # 檢查操作名稱是否有效
    if name not in {
        'select_column', 'select_row', 'sort', 'calculate',
        'group_by', 'value_counts', 'aggregate', 'crosstab', 'pivot_table', 'write'
    }:
        return False

    # 僅針對需參數操作檢查欄位
    if name in ['select_column', 'sort', 'group_by']:
        for arg in args:
            if arg not in df_columns:
                return False

    return True



# ===== 改進的TreeOfReport類別 =====
class TreeOfReport:
    def __init__(self, api_key: str, max_depth: int = 5, max_degree: int = 5):
        self.api_key = api_key
        self.max_depth = max_depth
        self.max_degree = max_degree

        # 載入配置檔案
        self.load_configurations()

        # 初始化改進的組件
        self.content_planner = ContentPlanner(api_key)
        self.df_operator = SafeDataFrameOperator(api_key)  # 使用安全版本
        self.text_generator = TextGenerator(api_key, table_description=self.table_description)
        
        # 新增追蹤功能
        self.execution_log: List[Dict[str, Any]] = []
        self.node_registry: Dict[str, TreeNode] = {}

    def load_configurations(self):
        self.table_description = read_text_file("filtered_data _description.txt")
        if not self.table_description or self.table_description == "No file available":
            self.table_description = "數據分析表格，包含各種欄位用於分析"

        self.operation_description = read_json_file("selected_operations.json")
        if isinstance(self.operation_description, list):
            self.operation_pool = [op['name'] for op in self.operation_description]
        else:
            self.operation_pool = list(self.operation_description.keys())

        logger.info(f"載入操作池: {self.operation_pool}")

    def build_tree(self, root_table: pd.DataFrame) -> TreeNode:
        """改進的樹構建，加入完整的追蹤和驗證"""
        root = TreeNode(level=0, text="資料分析報告", table=root_table, operation="root(None)")
        root.operation_history = ['root(None)']
        self.node_registry[root.node_id] = root
        
        queue = [root]
        
        while queue:
            current_node = queue.pop(0)
            
            # 記錄處理日誌
            self._log_node_processing(current_node)

            if current_node.operation.lower().startswith('write'):
                continue

            if current_node.level >= self.max_depth:
                write_node = self.create_child_node(current_node, 'write()')
                if write_node:
                    current_node.add_child(write_node)
                continue

            logger.info(f"處理節點 - Level: {current_node.level}, Operation: {current_node.operation}")

            tables_str = current_node.table.to_string()
            operations = self.content_planner.generate_operations(
                tables=tables_str,
                table_description=self.table_description,
                operation_description=self.operation_description,
                operation_history=current_node.operation_history,
                operation_pool=self.operation_pool,
                max_depth=self.max_depth,
                max_degree=self.max_degree
            )

            logger.info(f"生成操作: {operations}")

            for operation in operations[:self.max_degree]:
                if operation.strip():
                    child_node = self.create_child_node(current_node, operation)
                    if child_node:
                        current_node.add_child(child_node)
                        queue.append(child_node)

        self.generate_all_texts(root)
        return root
    
    def _log_node_processing(self, node: TreeNode):
        """記錄節點處理日誌"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "node_id": node.node_id,
            "level": node.level,
            "operation": node.operation,
            "table_shape": list(node.table.shape) if not node.table.empty else [0, 0],
            "validation_errors": node.validation_errors
        }
        self.execution_log.append(log_entry)
    
    def create_child_node(self, parent: TreeNode, operation: str) -> Optional[TreeNode]:
        """改進的子節點創建，加入完整驗證"""
        try:
            # 建立新的操作歷史
            new_operation_history = parent.operation_history + [operation]
            
            # 檢查是否為 write 操作
            if operation.lower().startswith('write'):
                text = self.text_generator.generate_text_for_write_operation(
                    parent.table,
                    new_operation_history
                )
                child = TreeNode(
                    level=parent.level + 1,
                    text=text,
                    table=parent.table.copy(),
                    operation=operation
                )
                child.operation_history = new_operation_history
                self.node_registry[child.node_id] = child
                logger.info(f"創建 write 節點: {operation}")
                return child
            else:
                # 其他操作：執行數據操作
                df_info = f"Shape: {parent.table.shape}\nColumns: {list(parent.table.columns)}\nData types:\n{parent.table.dtypes.to_string()}"
                code = self.df_operator.generate_code(operation, df_info)
                
                if code:
                    result_df = self.df_operator.safe_execute(code, parent.table)
                    child = TreeNode(
                        level=parent.level + 1,
                        text="",
                        table=result_df,
                        operation=operation
                    )
                    child.operation_history = new_operation_history
                    self.node_registry[child.node_id] = child
                    logger.info(f"創建數據操作節點: {operation}, 結果形狀: {result_df.shape}")
                    return child
                else:
                    logger.warning(f"無法生成操作代碼: {operation}")
                    return None
        
        except Exception as e:
            logger.error(f"創建子節點失敗: {e}")
            return None
    
    def generate_all_texts(self, node: TreeNode):
        """遞歸生成所有節點的文本"""
        for child in node.children:
            self.generate_all_texts(child)
        
        if node.is_leaf() and not node.text and node.operation and not node.operation.lower().startswith('write'):
            node.text = self.text_generator.generate_text_for_write_operation(
                node.table, 
                node.operation_history
            )
            print(f'node table: {node.table}')
        elif node.children:
            child_texts = [child.text for child in node.children if child.text.strip()]
            if child_texts:
                merged_text = self.text_generator.merge_child_texts(
                    child_texts, 
                    node.operation or "root"
                )
                if node.text:
                    node.text = node.text + "\n\n" + merged_text
                else:
                    node.text = merged_text
        logger.info(f'節點 {node.node_id} 文本生成完成')
        print(f'node.table: {node.table}')
        print(f'節點文本: {node.text}')
        
    def export_tree_structure(self, root: TreeNode, output_path: str = "tree_structure.json"):
        """導出樹結構為JSON格式，用於可視化和分析"""
        def node_to_dict(node: TreeNode) -> Dict[str, Any]:
            result = node.to_dict()
            result["children"] = [node_to_dict(child) for child in node.children]
            return result
        
        tree_data = {
            "metadata": {
                "export_time": datetime.now().isoformat(),
                "total_nodes": len(self.node_registry),
                "max_depth": self.max_depth,
                "max_degree": self.max_degree
            },
            "execution_log": self.execution_log,
            "tree": node_to_dict(root)
        }
        
        try:
            with open(output_path, 'w', encoding='utf-8') as f:
                json.dump(tree_data, f, indent=2, ensure_ascii=False)
            logger.info(f"樹結構已導出至: {output_path}")
        except Exception as e:
            logger.error(f"導出樹結構失敗: {e}")
    
    def generate_execution_report(self) -> str:
        """生成執行過程報告"""
        total_nodes = len(self.node_registry)
        error_nodes = sum(1 for node in self.node_registry.values() if node.validation_errors)
        
        report = f"""
# Tree-of-Report 執行報告

## 統計信息
- 總節點數: {total_nodes}
- 錯誤節點數: {error_nodes}
- 樹最大深度: {self.max_depth}
- 最大分支度: {self.max_degree}

## 節點分布
"""
        
        # 按層級統計節點
        level_counts = {}
        for node in self.node_registry.values():
            level = node.level
            level_counts[level] = level_counts.get(level, 0) + 1
        
        for level, count in sorted(level_counts.items()):
            report += f"- Level {level}: {count} 個節點\n"
        
        # 錯誤摘要
        if error_nodes > 0:
            report += "\n## 驗證錯誤摘要\n"
            for node in self.node_registry.values():
                if node.validation_errors:
                    report += f"- 節點 {node.node_id} ({node.operation}): {'; '.join(node.validation_errors)}\n"
        
        return report

    def generate_report(self, node: TreeNode, level: int = 0) -> str:
        """改進的報告生成"""
        if node.level == 0:
            prompt = f"""
            你是一位新聞記者，根據以下分析總結，請撰寫一篇賽事新聞報導，提供全面深入的分析，統整成新聞報導，文辭中過多直接使用欄位名稱與直接次數統計，用player_A與player_B表示兩球員，用生動的文句描述，勿出現累贅的句子，請從分析總結中提取轉換，禁止出現幻覺。
            請用繁體中文撰寫，保持邏輯清晰，資訊準確。

            分析總結:
            {node.text}
            """
            final_text = self.text_generator._retry_generate(prompt)
            
            # 保存多種格式的報告
            with open("tree_of_report.txt", "w", encoding="utf-8") as f:
                f.write(final_text)
            
            # 導出樹結構
            self.export_tree_structure(node)
            
            # 生成執行報告
            exec_report = self.generate_execution_report()
            with open("execution_report.md", "w", encoding="utf-8") as f:
                f.write(exec_report)
                print("finish generate report")
            
            return final_text
        else:
            logger.info(f'generate report from not root')
            indent = "  " * level
            report = f"{indent}{'#' * (level + 1)} {node.operation or 'Root'}\n\n"

            if node.text:
                report += f"{indent}{node.text}\n\n"

            if node.table is not None and not node.table.empty and level < 2:
                report += f"{indent}**資料摘要:** Shape {node.table.shape}\n"
                if len(node.table) <= 10:
                    report += f"{indent}```\n{node.table.to_string()}\n{indent}```\n\n"
                else:
                    report += f"{indent}```\n{node.table.head().to_string()}\n{indent}```\n\n"

            for child in node.children:
                report += self.generate_report(child, level + 1)

            return report


# ===== 主程序 =====
def main():
    """改進的主函數"""
    
    # 設置API密鑰
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        logger.error("請設置 GOOGLE_API_KEY 環境變數")
        return
    
    logger.info("Tree-of-Report for Data Analysis (改進版)")
    logger.info("="*50)
    
    logger.info("正在載入數據...")
    
    # 讀取CSV檔案
    try:
        TABLES = pd.read_csv('filtered_set1.csv')
        logger.info(f"成功載入CSV: {TABLES.shape[0]} 行, {TABLES.shape[1]} 列")
    except FileNotFoundError:
        logger.warning("找不到 filtered_set1.csv，使用示例數據")
        # 創建示例數據
        TABLES = pd.DataFrame({
            'type': ['A', 'B', 'A', 'C', 'B', 'A', 'C', 'B'],
            'lose_reason': ['net', 'out', 'net', 'long', 'net', 'out', 'long', 'net'],
            'getpoint_player': ['Player1', 'Player2', 'Player1', 'Player2', 'Player1', 'Player2', 'Player1', 'Player2'],
            'score': [1, 2, 1, 3, 2, 1, 4, 2]
        })
    
    # 設置參數
    MAX_DEPTH = 3
    MAX_DEGREE = 4
    
    logger.info(f"最大深度: {MAX_DEPTH}")
    logger.info(f"最大分支度: {MAX_DEGREE}")
    
    # 初始化改進的 Tree-of-Report
    tree_report = TreeOfReport(api_key, max_depth=MAX_DEPTH, max_degree=MAX_DEGREE)
    
    # 建構報告樹
    logger.info("開始建構報告樹...")
    start_time = datetime.now()
    
    try:
        root = tree_report.build_tree(TABLES)
        
        # 生成最終報告
        logger.info("生成最終報告...")
        final_report = tree_report.generate_report(root)
        
        # 輸出報告
        logger.info("\n" + "="*50)
        logger.info("TREE-OF-REPORT 最終報告")
        logger.info("="*50)
        print(final_report)
        
        # 儲存報告
        with open('tree_of_report.md', 'w', encoding='utf-8') as f:
            f.write("# Tree-of-Report 數據分析報告 (改進版)\n\n")
            f.write(final_report)
        
        end_time = datetime.now()
        duration = (end_time - start_time).total_seconds()
        
        logger.info(f"報告生成完成，耗時: {duration:.2f} 秒")
        logger.info("生成的文件:")
        logger.info("- tree_of_report.md: 最終報告")
        logger.info("- tree_of_report.txt: 純文本報告")
        logger.info("- tree_structure.json: 樹結構數據")
        logger.info("- execution_report.md: 執行過程報告")
        logger.info("- tree_visualization.html: 可視化頁面")
        
    except Exception as e:
        logger.error(f"程序執行失敗: {e}")
        import traceback
        traceback.print_exc()
    
    finally:
        # 清理暫存檔案
        for temp_file in ['input_tmp.csv', 'tmp.csv']:
            if os.path.exists(temp_file):
                try:
                    os.remove(temp_file)
                    logger.info(f"清理暫存檔案: {temp_file}")
                except:
                    pass

if __name__ == "__main__":
    main()

2025-06-06 11:24:11,808 - INFO - Tree-of-Report for Data Analysis (改進版)
2025-06-06 11:24:11,809 - INFO - 正在載入數據...
2025-06-06 11:24:11,812 - INFO - 成功載入CSV: 315 行, 9 列
2025-06-06 11:24:11,812 - INFO - 最大深度: 3
2025-06-06 11:24:11,812 - INFO - 最大分支度: 4
2025-06-06 11:24:11,814 - INFO - 載入操作池: ['write', 'select_row', 'select_column', 'group_by', 'aggregate', 'value_counts', 'crosstab', 'pivot_table', 'sort', 'calculate']
2025-06-06 11:24:11,815 - INFO - 開始建構報告樹...
2025-06-06 11:24:11,815 - INFO - 處理節點 - Level: 0, Operation: root(None)
2025-06-06 11:24:11,822 - INFO - 正在向Gemini發送請求...
2025-06-06 11:24:13,400 - INFO - 成功獲得Gemini回應
2025-06-06 11:24:13,400 - INFO - 生成操作: ['select_column(player,type,lose_reason,getpoint_player)', 'value_counts(type)', 'value_counts(lose_reason)', 'value_counts(getpoint_player)']
2025-06-06 11:24:16,452 - INFO - 操作成功，結果形狀: (315, 4)
2025-06-06 11:24:16,453 - INFO - 創建數據操作節點: select_column(player,type,lose_reason,getpoint_player), 結果形狀: (315, 4)
2025-06-06 11:24:1

DataFrame已成功保存到 tmp.csv


2025-06-06 11:24:18,701 - INFO - 操作成功，結果形狀: (18, 2)
2025-06-06 11:24:18,702 - INFO - 創建數據操作節點: value_counts(type), 結果形狀: (18, 2)
2025-06-06 11:24:18,703 - INFO - 添加子節點: 6766c7a8 to a46eb399


value_counts('type') 操作完成，結果已儲存到 tmp.csv


2025-06-06 11:24:20,505 - INFO - 操作成功，結果形狀: (4, 2)
2025-06-06 11:24:20,506 - INFO - 創建數據操作節點: value_counts(lose_reason), 結果形狀: (4, 2)
2025-06-06 11:24:20,507 - INFO - 添加子節點: 6b604cfd to a46eb399


value_counts 操作已完成，結果已保存到 tmp.csv


2025-06-06 11:24:22,720 - INFO - 操作成功，結果形狀: (2, 2)
2025-06-06 11:24:22,720 - INFO - 創建數據操作節點: value_counts(getpoint_player), 結果形狀: (2, 2)
2025-06-06 11:24:22,720 - INFO - 添加子節點: ad3b8632 to a46eb399
2025-06-06 11:24:22,721 - INFO - 處理節點 - Level: 1, Operation: select_column(player,type,lose_reason,getpoint_player)
2025-06-06 11:24:22,724 - INFO - 正在向Gemini發送請求...


value_counts 操作已成功執行並保存到 'tmp.csv'


2025-06-06 11:24:23,745 - INFO - 成功獲得Gemini回應
2025-06-06 11:24:23,745 - INFO - 生成操作: ['value_counts(type)', 'value_counts(lose_reason)']
2025-06-06 11:24:25,559 - INFO - 操作成功，結果形狀: (18, 2)
2025-06-06 11:24:25,561 - INFO - 創建數據操作節點: value_counts(type), 結果形狀: (18, 2)
2025-06-06 11:24:25,561 - INFO - 添加子節點: 1a48c3b5 to 68962585


value_counts('type') 操作已完成，结果已保存到 tmp.csv


2025-06-06 11:24:27,829 - INFO - 操作成功，結果形狀: (4, 2)
2025-06-06 11:24:27,830 - INFO - 創建數據操作節點: value_counts(lose_reason), 結果形狀: (4, 2)
2025-06-06 11:24:27,830 - INFO - 添加子節點: fe370ae8 to 68962585
2025-06-06 11:24:27,831 - INFO - 處理節點 - Level: 1, Operation: value_counts(type)
2025-06-06 11:24:27,832 - INFO - 正在向Gemini發送請求...


value_counts('lose_reason') 操作成功，结果已保存到 tmp.csv


2025-06-06 11:24:28,569 - INFO - 成功獲得Gemini回應
2025-06-06 11:24:28,572 - INFO - 生成操作: ['sort(count)']
2025-06-06 11:24:30,259 - INFO - 操作成功，結果形狀: (18, 2)
2025-06-06 11:24:30,261 - INFO - 創建數據操作節點: sort(count), 結果形狀: (18, 2)
2025-06-06 11:24:30,261 - INFO - 添加子節點: d7445d2b to 6766c7a8
2025-06-06 11:24:30,261 - INFO - 處理節點 - Level: 1, Operation: value_counts(lose_reason)
2025-06-06 11:24:30,263 - INFO - 正在向Gemini發送請求...


DataFrame已成功排序並儲存至 tmp.csv


2025-06-06 11:24:31,118 - INFO - 成功獲得Gemini回應
2025-06-06 11:24:31,118 - INFO - 生成操作: ['select_column(lose_reason, count)', 'sort(count, ascending=False)', 'write()']
2025-06-06 11:24:33,906 - INFO - 操作成功，結果形狀: (4, 2)
2025-06-06 11:24:33,906 - INFO - 創建數據操作節點: select_column(lose_reason, count), 結果形狀: (4, 2)
2025-06-06 11:24:33,908 - INFO - 添加子節點: 496577ff to 6b604cfd


Successfully processed 'input_tmp.csv' and saved the result to 'tmp.csv'


2025-06-06 11:24:35,569 - INFO - 操作成功，結果形狀: (4, 2)
2025-06-06 11:24:35,569 - INFO - 創建數據操作節點: sort(count, ascending=False), 結果形狀: (4, 2)
2025-06-06 11:24:35,570 - INFO - 添加子節點: 50728b8d to 6b604cfd


CSV檔案已成功讀取、排序並儲存為 'tmp.csv'


2025-06-06 11:24:36,444 - INFO - 創建 write 節點: write()
2025-06-06 11:24:36,445 - INFO - 添加子節點: cdb60d60 to 6b604cfd
2025-06-06 11:24:36,446 - INFO - 處理節點 - Level: 1, Operation: value_counts(getpoint_player)
2025-06-06 11:24:36,447 - INFO - 正在向Gemini發送請求...
2025-06-06 11:24:37,202 - INFO - 成功獲得Gemini回應
2025-06-06 11:24:37,204 - INFO - 生成操作: ['write()']
2025-06-06 11:24:38,277 - INFO - 創建 write 節點: write()
2025-06-06 11:24:38,277 - INFO - 添加子節點: 7711d075 to ad3b8632
2025-06-06 11:24:38,278 - INFO - 處理節點 - Level: 2, Operation: value_counts(type)
2025-06-06 11:24:38,279 - INFO - 正在向Gemini發送請求...
2025-06-06 11:24:38,625 - ERROR - Gemini API請求失敗: 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quot

node table:      type  count
0      長球     55
1      殺球     36
2      挑球     35
3      切球     31
4      推球     31
5     放小球     28
6     擋小球     20
7    未知球種     16
8      勾球     12
9     發長球     10
10    發短球     10
11  後場抽平球      7
12   過度切球      6
13   防守回抽      5
14     撲球      5
15     點扣      4
16   防守回挑      2
17     平球      2
node.table:      type  count
0      長球     55
1      殺球     36
2      挑球     35
3      切球     31
4      推球     31
5     放小球     28
6     擋小球     20
7    未知球種     16
8      勾球     12
9     發長球     10
10    發短球     10
11  後場抽平球      7
12   過度切球      6
13   防守回抽      5
14     撲球      5
15     點扣      4
16   防守回挑      2
17     平球      2
節點文本: 場上局勢膠著，雙方你來我往。長球使用次數最多，高達55次，但未知球員得分效率驚人。殺球緊隨其後，有36次，可見進攻端火力十足！


2025-06-06 11:25:12,487 - INFO - 節點 fe370ae8 文本生成完成


node table:   lose_reason  count
0      對手落地致勝     12
1          出界     12
2          掛網     10
3         未過網      2
node.table:   lose_reason  count
0      對手落地致勝     12
1          出界     12
2          掛網     10
3         未過網      2
節點文本: 比賽中，可見對手落地得分與我方出界是主要失分因素，各佔12分，掛網失誤也不容忽視，丟失10分。減少無謂失誤，是接下來需要重點調整的方向。


2025-06-06 11:25:13,750 - INFO - 節點 68962585 文本生成完成
  if table[col].nunique() <= 10 or dtype == 'object' or pd.api.types.is_categorical_dtype(table[col]):


node.table:     player  type lose_reason getpoint_player
0        B   發長球         NaN             NaN
1        A    切球         NaN             NaN
2        B    挑球         NaN             NaN
3        A    長球         NaN             NaN
4        B    殺球         NaN             NaN
..     ...   ...         ...             ...
310      B  未知球種         NaN             NaN
311      A    切球         NaN             NaN
312      B    挑球         NaN             NaN
313      A    長球         NaN             NaN
314      B    長球          出界               A

[315 rows x 4 columns]
節點文本: 球場局勢膠著，雙方互有攻防。長球使用頻率最高，達到55次，未知球員得分效率驚人。殺球次數緊隨其後，共36次，進攻火力強勁。比賽中，對手落地得分與我方出界為主要失分因素，各佔12分，掛網失誤也造成10分丟失。減少不必要的失誤將是未來調整的重點。


2025-06-06 11:25:14,787 - INFO - 節點 d7445d2b 文本生成完成


node table:      type  count
0      平球      2
1    防守回挑      2
2      點扣      4
3    防守回抽      5
4      撲球      5
5    過度切球      6
6   後場抽平球      7
7     發長球     10
8     發短球     10
9      勾球     12
10   未知球種     16
11    擋小球     20
12    放小球     28
13     切球     31
14     推球     31
15     挑球     35
16     殺球     36
17     長球     55
node.table:      type  count
0      平球      2
1    防守回挑      2
2      點扣      4
3    防守回抽      5
4      撲球      5
5    過度切球      6
6   後場抽平球      7
7     發長球     10
8     發短球     10
9      勾球     12
10   未知球種     16
11    擋小球     20
12    放小球     28
13     切球     31
14     推球     31
15     挑球     35
16     殺球     36
17     長球     55
節點文本: 本場比賽雙方在長球的使用上非常頻繁，高達55次，挑球的次數也不少，有35次。另外，切球和推球的次數也旗鼓相當，分別為31次，可見這兩種技術是選手們常用的得分手段。


2025-06-06 11:25:15,625 - INFO - 節點 6766c7a8 文本生成完成


node.table:      type  count
0      長球     55
1      殺球     36
2      挑球     35
3      切球     31
4      推球     31
5     放小球     28
6     擋小球     20
7    未知球種     16
8      勾球     12
9     發長球     10
10    發短球     10
11  後場抽平球      7
12   過度切球      6
13   防守回抽      5
14     撲球      5
15     點扣      4
16   防守回挑      2
17     平球      2
節點文本: 本場比賽雙方頻繁使用長球，次數高達55次，挑球也有35次。切球和推球的使用次數相當，分別為31次，顯示這兩種技術是選手們常用的得分手段。


2025-06-06 11:25:16,534 - INFO - 節點 496577ff 文本生成完成


node table:   lose_reason  count
0      對手落地致勝     12
1          出界     12
2          掛網     10
3         未過網      2
node.table:   lose_reason  count
0      對手落地致勝     12
1          出界     12
2          掛網     10
3         未過網      2
節點文本: 比賽中，可見對手落地得分與出界是主要失分點，各有12次，需要重點提防。掛網失誤也有10次，不可忽視。


2025-06-06 11:25:17,737 - INFO - 節點 50728b8d 文本生成完成
2025-06-06 11:25:17,739 - INFO - 節點 cdb60d60 文本生成完成


node table:   lose_reason  count
0      對手落地致勝     12
1          出界     12
2          掛網     10
3         未過網      2
node.table:   lose_reason  count
0      對手落地致勝     12
1          出界     12
2          掛網     10
3         未過網      2
節點文本: 本場比賽雙方在場上爭奪激烈，可見「對手落地致勝」與「出界」是主要失分原因，各有12次之多，而「掛網」失誤也有10次，選手需多加注意。
node.table:   lose_reason  count
0      對手落地致勝     12
1          出界     12
2          掛網     10
3         未過網      2
節點文本: 場上局勢膠著，雙方互有攻防。可見「對手落地致勝」與「出界」為主要失分因素，各位球員需要多加留意。


2025-06-06 11:25:18,620 - INFO - 節點 6b604cfd 文本生成完成
2025-06-06 11:25:18,622 - INFO - 節點 7711d075 文本生成完成


node.table:   lose_reason  count
0      對手落地致勝     12
1          出界     12
2          掛網     10
3         未過網      2
節點文本: 本場比賽雙方爭奪激烈，局勢膠著。主要失分點為對手落地得分與出界，各有12次，需重點提防。掛網失誤亦有10次，不可忽視，球員們需多加留意。
node.table:   getpoint_player  count
0               A     21
1               B     15
節點文本: A選手進攻火力全開，拿下全場最高的21分！B選手也不甘示弱，努力追分，取得15分。雖然數據未能顯示具體的失分原因，但A選手的得分能力無疑是本場比賽的一大亮點。


2025-06-06 11:25:19,499 - INFO - 節點 ad3b8632 文本生成完成


node.table:   getpoint_player  count
0               A     21
1               B     15
節點文本: A選手進攻火力全開，以全場最高的21分領先！B選手奮力追趕，獲得15分。雖然未明確指出失分原因，但A選手的得分能力是本次比賽的亮點。


2025-06-06 11:25:20,804 - INFO - 節點 a46eb399 文本生成完成
2025-06-06 11:25:20,807 - INFO - 生成最終報告...


node.table:      Unnamed: 0  rally      time  roundscore_A  roundscore_B player  type  \
0             0      1  00:05:47             1             0      B   發長球   
1             1      1  00:05:49             1             0      A    切球   
2             2      1  00:05:50             1             0      B    挑球   
3             3      1  00:05:51             1             0      A    長球   
4             4      1  00:05:52             1             0      B    殺球   
..          ...    ...       ...           ...           ...    ...   ...   
310         310     36  00:24:44            21            15      B  未知球種   
311         311     36  00:24:58            21            15      A    切球   
312         312     36  00:25:00            21            15      B    挑球   
313         313     36  00:25:01            21            15      A    長球   
314         314     36  00:25:02            21            15      B    長球   

    lose_reason getpoint_player  
0           NaN             N

2025-06-06 11:25:24,558 - INFO - 樹結構已導出至: tree_structure.json
2025-06-06 11:25:24,559 - INFO - 
2025-06-06 11:25:24,560 - INFO - TREE-OF-REPORT 最終報告
2025-06-06 11:25:24,563 - INFO - 報告生成完成，耗時: 72.75 秒
2025-06-06 11:25:24,563 - INFO - 生成的文件:
2025-06-06 11:25:24,565 - INFO - - tree_of_report.md: 最終報告
2025-06-06 11:25:24,566 - INFO - - tree_of_report.txt: 純文本報告
2025-06-06 11:25:24,567 - INFO - - tree_structure.json: 樹結構數據
2025-06-06 11:25:24,568 - INFO - - execution_report.md: 執行過程報告
2025-06-06 11:25:24,569 - INFO - - tree_visualization.html: 可視化頁面
2025-06-06 11:25:24,570 - INFO - 清理暫存檔案: input_tmp.csv
2025-06-06 11:25:24,571 - INFO - 清理暫存檔案: tmp.csv


finish generate report
## 猛攻奏效！羽球賽事戰況膠著，A選手火力全開險勝

羽球賽事現場氣氛緊張，雙方選手你來我往，比分始終無法拉開。整場比賽可謂高來高去，長球戰術頻繁使用，高達55次，展現選手們對場地深度的高度掌握。挑球的運用也相當關鍵，共計35次，試圖擾亂對手節奏。切球與推球則如兩把利刃，各有31次的精準施放，考驗著選手的細膩手感。

然而，在這場戰術交鋒中，A選手憑藉其驚人的進攻火力脫穎而出。他猶如一頭猛獸，殺球次數高達36次，讓對手難以招架。反觀B選手，雖奮力抵抗，卻難以抵擋A選手的強勢進攻。

儘管如此，比賽並非毫無破綻。雙方選手在比賽中都出現了因對手落地得分、自身出界以及掛網等失誤，其中落地得分與出界各佔12分，掛網失誤亦有10分，突顯了減少非受迫性失誤的重要性。

最終，A選手憑藉著更勝一籌的進攻能力，以21分的佳績力壓B選手的15分，險勝對手。這場比賽不僅展現了選手們精湛的球技，也提醒著我們，在追求進攻的同時，穩紮穩打，減少不必要的失誤，才能在激烈的競爭中脫穎而出。


In [None]:
import pandas as pd
import json
import google.generativeai as genai
import os
import dspy
import ast
import re
from typing import List, Dict, Any, Optional, Set
import copy
import hashlib
import logging
from datetime import datetime
import sys
import builtins
import time

# 設置日誌
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# ===== 新增：新聞寫作風格配置 =====
class NewsStyleConfig:
    """新聞寫作風格配置類"""
    
    # 羽球專業術語對照表
    BADMINTON_TERMS = {
        'net': '網前失誤',
        'out': '出界',
        'long': '過底線',
        'smash': '殺球',
        'clear': '高遠球',
        'drop': '切球',
        'drive': '平抽球',
        'serve': '發球',
        'return': '回球'
    }
    
    # 新聞常用動詞
    ACTION_VERBS = [
        '展現', '發揮', '掌握', '運用', '施展', '控制', '主導', '壓制',
        '突破', '創造', '締造', '奠定', '確立', '鞏固', '扭轉', '逆轉'
    ]
    
    # 新聞形容詞
    DESCRIPTIVE_ADJECTIVES = [
        '精彩', '激烈', '關鍵', '致命', '精準', '穩健', '霸氣', '靈活',
        '果決', '冷靜', '強勢', '驚艷', '出色', '卓越', '完美', '絕佳'
    ]
    
    # 新聞句式模板
    SENTENCE_TEMPLATES = [
        "{player}在{situation}中{action}，{result}",
        "憑藉{skill}，{player}{achievement}",
        "{player}以{score_pattern}{victory_method}，{final_result}",
        "比賽中{player}{performance}，{impact}",
        "關鍵時刻{player}{key_action}，{outcome}"
    ]

# ===== 改進的文本生成器 =====
class EnhancedTextGenerator:
    def __init__(self, api_key, table_description=""):
        self.api_key = api_key
        genai.configure(api_key=api_key)
        self.model = genai.GenerativeModel("gemini-2.0-flash")
        self.table_description = table_description
        self.style_config = NewsStyleConfig()
        
        # 新增：比賽上下文記憶
        self.match_context = {
            'player_names': ['Player_A', 'Player_B'],
            'match_type': '羽球賽事',
            'key_moments': [],
            'performance_trends': {}
        }

    def _convert_to_json_serializable(self, obj):
        """將numpy/pandas類型轉換為JSON可序列化的Python基本類型"""
        import numpy as np
        import pandas as pd
        
        if isinstance(obj, (np.integer, np.int64)):
            return int(obj)
        elif isinstance(obj, (np.floating, np.float64)):
            return float(obj)
        elif isinstance(obj, (pd.Series, pd.Index)):
            return obj.tolist()
        elif isinstance(obj, pd.DataFrame):
            return obj.to_dict(orient='records')
        elif isinstance(obj, dict):
            return {k: self._convert_to_json_serializable(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [self._convert_to_json_serializable(item) for item in obj]
        else:
            return obj
        
    def extract_detailed_insights(self, table: pd.DataFrame) -> Dict[str, Any]:
        """提取更詳細的比賽洞察"""
        insights = {
            'basic_stats': {},
            'patterns': {},
            'key_moments': [],
            'player_performance': {},
            'tactical_analysis': {}
        }
        
        try:
            # 基本統計
            if 'lose_reason' in table.columns:
                lose_reasons = table['lose_reason'].value_counts()
                insights['basic_stats']['top_lose_reason'] = {
                    'reason': lose_reasons.index[0] if len(lose_reasons) > 0 else '未知',
                    'count': lose_reasons.iloc[0] if len(lose_reasons) > 0 else 0,
                    'percentage': round((lose_reasons.iloc[0] / len(table)) * 100, 1) if len(lose_reasons) > 0 else 0
                }
                
            if 'getpoint_player' in table.columns:
                point_winners = table['getpoint_player'].value_counts()
                insights['basic_stats']['leading_player'] = {
                    'player': point_winners.index[0] if len(point_winners) > 0 else '未知',
                    'points': point_winners.iloc[0] if len(point_winners) > 0 else 0,
                    'advantage': point_winners.iloc[0] - point_winners.iloc[1] if len(point_winners) > 1 else 0
                }
            
            # 模式分析
            if 'type' in table.columns and 'lose_reason' in table.columns:
                pattern_analysis = table.groupby('type')['lose_reason'].value_counts()
                insights['patterns']['shot_error_correlation'] = pattern_analysis.to_dict()
            
            # 關鍵時刻識別（基於分數變化）
            if 'score' in table.columns:
                score_changes = table['score'].diff().abs()
                critical_moments = table[score_changes > score_changes.quantile(0.8)]
                insights['key_moments'] = critical_moments.to_dict('records')[:3]  # 前3個關鍵時刻
            
            # 球員表現分析
            if 'getpoint_player' in table.columns:
                for player in table['getpoint_player'].unique():
                    player_data = table[table['getpoint_player'] == player]
                    insights['player_performance'][player] = {
                        'total_points': len(player_data),
                        'winning_shots': player_data['type'].value_counts().to_dict() if 'type' in table.columns else {},
                        'consistency': self._calculate_consistency(player_data)
                    }
                    
        except Exception as e:
            logger.error(f"提取洞察失敗: {e}")
            
        return self._convert_to_json_serializable(insights)

    def _calculate_consistency(self, player_data: pd.DataFrame) -> float:
        """計算球員一致性指標"""
        try:
            if 'score' in player_data.columns and len(player_data) > 1:
                score_variance = player_data['score'].var()
                return max(0, 1 - (score_variance / 10))  # 標準化一致性分數
            return 0.5  # 預設值
        except:
            return 0.5

    def generate_contextual_narrative(self, insights: Dict[str, Any], operation_history: List[str]) -> str:
        """根據洞察生成有上下文的敘述"""
        narratives = []
        insights = self._convert_to_json_serializable(insights)
        # 開場敘述
        if insights.get('basic_stats', {}).get('leading_player'):
            leader_info = insights['basic_stats']['leading_player']
            if leader_info['advantage'] > 2:
                narratives.append(f"{leader_info['player']}在本局展現強勢表現，以{leader_info['points']}分領先對手{leader_info['advantage']}分")
            else:
                narratives.append(f"雙方戰況膠著，{leader_info['player']}僅以{leader_info['advantage']}分的微幅優勢領先")
        
        # 失誤分析敘述
        if insights.get('basic_stats', {}).get('top_lose_reason'):
            error_info = insights['basic_stats']['top_lose_reason']
            error_desc = self.style_config.BADMINTON_TERMS.get(error_info['reason'], error_info['reason'])
            narratives.append(f"比賽中{error_desc}成為主要失分因素，佔總失誤的{error_info['percentage']}%")
        
        # 關鍵時刻敘述
        if insights.get('key_moments'):
            narratives.append("關鍵分數轉折點出現激烈攻防，雙方你來我往不相上下")
        
        return "，".join(narratives) + "。"

    def generate_enhanced_write_content(self, table: pd.DataFrame, operation_history: List[str]) -> str:
        """生成增強版的寫作內容"""
        
        # 提取詳細洞察
        insights = self.extract_detailed_insights(table)
        
        # 生成上下文敘述
        contextual_narrative = self.generate_contextual_narrative(insights, operation_history)
        
        table_str = table.to_string()
        WRITE_TOKENS = 80  # 增加token數量以支援更豐富內容
        
        enhanced_prompt = f"""
System: 你是專業的體育新聞記者，專精羽球賽事報導。請根據數據分析撰寫一段精彩的比賽片段描述。

# 寫作要求
1. 使用繁體中文，文筆生動活潑
2. 採用體育新聞的專業語調，避免過於技術性的用詞
3. 重點描述比賽節奏、球員表現和戰術運用
4. 字數控制在{WRITE_TOKENS}字以內
5. 避免直接引用欄位名稱，使用自然的描述方式
6. 突出比賽的戲劇性和觀賞性

# 專業術語對照
{json.dumps(self.style_config.BADMINTON_TERMS, ensure_ascii=False, indent=2)}

# 比賽數據洞察
{json.dumps(insights, ensure_ascii=False, indent=2)}

# 上下文敘述參考
{contextual_narrative}

# 原始數據表格
{table_str}

請撰寫一段引人入勝的比賽描述：
"""
        
        return self._retry_generate_with_quality_check(enhanced_prompt)

    def _retry_generate_with_quality_check(self, prompt: str, max_retries=3) -> str:
        """帶配額管理和指數退避的重試生成"""
        import random
        
        for attempt in range(max_retries):
            try:
                # 添加隨機延遲以避免突發請求
                delay = random.uniform(0.5, 2.0) * (2 ** attempt)
                time.sleep(delay)
                
                response = self.model.generate_content(prompt)
                
                if response.text:
                    text = response.text.strip()
                    quality_score = self._assess_text_quality(text)
                    
                    if quality_score >= 0.7:
                        return text
                    elif attempt < max_retries - 1:
                        logger.info(f"文本品質不達標 (分數: {quality_score:.2f})，重新生成...")
                        time.sleep(1)
                    else:
                        return text
                        
            except Exception as e:
                if "429" in str(e):  # 配額錯誤
                    wait_time = 30 + random.randint(0, 10)  # 隨機等待30-40秒
                    logger.warning(f"API配額不足，等待 {wait_time} 秒後重試...")
                    time.sleep(wait_time)
                else:
                    logger.error(f"生成失敗 (嘗試 {attempt+1}/{max_retries}): {e}")
                    if attempt < max_retries - 1:
                        time.sleep(2)
                    
        return "⚠️ 內容生成暫時失敗，請稍後重試"

    def _assess_text_quality(self, text: str) -> float:
        """評估文本品質"""
        score = 0.0
        
        # 長度檢查 (20%)
        if 30 <= len(text) <= 120:
            score += 0.2
        
        # 專業術語使用 (20%)
        term_usage = sum(1 for term in self.style_config.BADMINTON_TERMS.values() if term in text)
        if term_usage > 0:
            score += min(0.2, term_usage * 0.1)
        
        # 動詞活躍度 (20%)
        verb_usage = sum(1 for verb in self.style_config.ACTION_VERBS if verb in text)
        if verb_usage > 0:
            score += min(0.2, verb_usage * 0.05)
        
        # 避免技術欄位名稱 (20%)
        tech_terms = ['lose_reason', 'getpoint_player', 'type', 'column', 'row']
        if not any(term in text for term in tech_terms):
            score += 0.2
        
        # 語句流暢度 (20%) - 簡單檢查標點符號
        if '，' in text or '。' in text:
            score += 0.2
        
        return min(1.0, score)

    def generate_comprehensive_final_report(self, child_texts: List[str], match_metadata: Dict[str, Any]) -> str:
        """生成綜合性最終報告"""
        
        if not child_texts:
            return "比賽數據分析完成，詳細內容請參考各階段分析。"
        
        # 整合所有子文本
        consolidated_content = "\n".join([f"• {text.strip()}" for text in child_texts if text.strip()])
        
        final_prompt = f"""
System: 你是資深體育記者，負責撰寫羽球賽事的深度報導。請根據以下分析內容，撰寫一篇完整的賽事新聞稿。

# 撰寫要求
1. 採用新聞稿格式，包含導言、內文和結語
2. 語言生動，突出比賽亮點和戲劇性
3. 避免使用技術性術語，改用讀者易懂的描述
4. 字數控制在150-200字
5. 結構清晰，邏輯順暢
6. 使用「選手A」和「選手B」稱呼球員

# 新聞寫作範例風格
"在今日的精彩對決中，選手A展現出色的網前技巧，多次運用精準的切球製造得分機會。然而選手B並未示弱，憑藉強勁的後場攻擊力，在關鍵時刻連續得分扳回劣勢。整場比賽高潮迭起，兩位選手的精彩表現讓現場觀眾大飽眼福。"

# 分析內容摘要
{consolidated_content}

請撰寫完整的賽事新聞報導：
"""
        
        return self._retry_generate_with_quality_check(final_prompt, max_retries=2)

    def merge_child_texts_enhanced(self, child_texts: List[str], parent_operation: str) -> str:
        """增強版文本合併"""
        if not child_texts:
            return ""

        # 過濾和清理文本
        clean_texts = []
        for text in child_texts:
            cleaned = text.strip()
            if cleaned and len(cleaned) > 10:  # 過濾過短的文本
                clean_texts.append(cleaned)
        
        if not clean_texts:
            return ""

        # 智能合併邏輯
        if len(clean_texts) == 1:
            return clean_texts[0]
        
        # 多段落合併
        merge_prompt = f"""
請將以下羽球比賽分析片段整合成一段連貫的描述：

{chr(10).join([f"{i+1}. {text}" for i, text in enumerate(clean_texts)])}

整合要求：
1. 保持所有關鍵信息
2. 確保邏輯順序合理
3. 語言自然流暢
4. 避免重複內容
5. 字數控制在100字以內

整合結果：
"""
        
        return self._retry_generate_with_quality_check(merge_prompt)

# ===== 改進的TreeOfReport類別 =====
class EnhancedTreeOfReport:
    def __init__(self, api_key: str, max_depth: int = 5, max_degree: int = 5):
        self.api_key = api_key
        self.max_depth = max_depth
        self.max_degree = max_degree

        # 載入配置檔案
        self.load_configurations()

        # 使用增強版文本生成器
        self.content_planner = ContentPlanner(api_key)
        self.df_operator = SafeDataFrameOperator(api_key)
        self.text_generator = EnhancedTextGenerator(api_key, table_description=self.table_description)
        
        # 新增：比賽元數據追蹤
        self.match_metadata = {
            'total_points': 0,
            'analysis_depth': 0,
            'key_insights': [],
            'generation_quality': []
        }
        
        # 追蹤功能
        self.execution_log: List[Dict[str, Any]] = []
        self.node_registry: Dict[str, TreeNode] = {}

    def load_configurations(self):
        """載入配置，與原版相同"""
        self.table_description = read_text_file("filtered_data _description.txt")
        if not self.table_description or self.table_description == "No file available":
            self.table_description = "羽球比賽數據分析表格，包含得分模式、失誤類型和球員表現等關鍵指標"

        self.operation_description = read_json_file("selected_operations.json")
        if isinstance(self.operation_description, list):
            self.operation_pool = [op['name'] for op in self.operation_description]
        else:
            self.operation_pool = list(self.operation_description.keys())

        logger.info(f"載入操作池: {self.operation_pool}")

    def create_child_node_enhanced(self, parent: TreeNode, operation: str) -> Optional[TreeNode]:
        """增強版子節點創建"""
        try:
            new_operation_history = parent.operation_history + [operation]
            
            if operation.lower().startswith('write'):
                # 使用增強版文本生成
                text = self.text_generator.generate_enhanced_write_content(
                    parent.table,
                    new_operation_history
                )
                
                child = TreeNode(
                    level=parent.level + 1,
                    text=text,
                    table=parent.table.copy(),
                    operation=operation
                )
                child.operation_history = new_operation_history
                self.node_registry[child.node_id] = child
                
                # 評估生成質量
                quality_score = self.text_generator._assess_text_quality(text)
                self.match_metadata['generation_quality'].append(quality_score)
                
                logger.info(f"創建 write 節點: {operation} (品質分數: {quality_score:.2f})")
                return child
            else:
                # 數據操作節點（與原版相同邏輯）
                df_info = f"Shape: {parent.table.shape}\nColumns: {list(parent.table.columns)}\nData types:\n{parent.table.dtypes.to_string()}"
                code = self.df_operator.generate_code(operation, df_info)
                
                if code:
                    result_df = self.df_operator.safe_execute(code, parent.table)
                    child = TreeNode(
                        level=parent.level + 1,
                        text="",
                        table=result_df,
                        operation=operation
                    )
                    child.operation_history = new_operation_history
                    self.node_registry[child.node_id] = child
                    logger.info(f"創建數據操作節點: {operation}, 結果形狀: {result_df.shape}")
                    return child
                else:
                    logger.warning(f"無法生成操作代碼: {operation}")
                    return None
        
        except Exception as e:
            logger.error(f"創建子節點失敗: {e}")
            return None

    def generate_all_texts_enhanced(self, node: TreeNode):
        """增強版文本生成"""
        # 遞歸處理子節點
        for child in node.children:
            self.generate_all_texts_enhanced(child)
        
        # 葉節點處理
        if node.is_leaf() and not node.text and node.operation and not node.operation.lower().startswith('write'):
            node.text = self.text_generator.generate_enhanced_write_content(
                node.table, 
                node.operation_history
            )
        # 非葉節點合併
        elif node.children:
            child_texts = [child.text for child in node.children if child.text.strip()]
            if child_texts:
                merged_text = self.text_generator.merge_child_texts_enhanced(
                    child_texts, 
                    node.operation or "root"
                )
                if node.text:
                    node.text = node.text + "\n\n" + merged_text
                else:
                    node.text = merged_text
        
        logger.info(f'節點 {node.node_id} 增強文本生成完成')

    def generate_final_report_enhanced(self, node: TreeNode) -> str:
        """生成增強版最終報告"""
        if node.level == 0:
            # 計算整體品質指標
            avg_quality = sum(self.match_metadata['generation_quality']) / len(self.match_metadata['generation_quality']) if self.match_metadata['generation_quality'] else 0
            
            # 使用增強版最終報告生成
            final_text = self.text_generator.generate_comprehensive_final_report(
                [node.text] if node.text else [],
                {**self.match_metadata, 'average_quality': avg_quality}
            )
            
            # 保存報告和元數據
            with open("enhanced_tree_report.txt", "w", encoding="utf-8") as f:
                f.write(final_text)
            
            with open("generation_quality_report.json", "w", encoding="utf-8") as f:
                quality_report = {
                    "average_quality": avg_quality,
                    "total_generations": len(self.match_metadata['generation_quality']),
                    "quality_scores": self.match_metadata['generation_quality'],
                    "metadata": self.match_metadata
                }
                json.dump(quality_report, f, indent=2, ensure_ascii=False)
            
            logger.info(f"最終報告生成完成，平均品質分數: {avg_quality:.2f}")
            return final_text
        else:
            # 子節點報告（與原版相同）
            return self._generate_hierarchical_report(node, 0)

    def _generate_hierarchical_report(self, node: TreeNode, level: int) -> str:
        """生成階層式報告"""
        indent = "  " * level
        report = f"{indent}{'#' * (level + 1)} {node.operation or 'Root'}\n\n"

        if node.text:
            report += f"{indent}{node.text}\n\n"

        if node.table is not None and not node.table.empty and level < 2:
            report += f"{indent}**資料摘要:** Shape {node.table.shape}\n"
            if len(node.table) <= 10:
                report += f"{indent}```\n{node.table.to_string()}\n{indent}```\n\n"
            else:
                report += f"{indent}```\n{node.table.head().to_string()}\n{indent}```\n\n"

        for child in node.children:
            report += self._generate_hierarchical_report(child, level + 1)

        return report
    
    def build_tree(self, table: pd.DataFrame) -> 'TreeNode':
        """構建報告樹的根節點"""
        root = TreeNode(level=0, text="", table=table.copy(), operation="root")
        self.node_registry[root.node_id] = root

        def expand(node: TreeNode, depth: int):
            if depth >= self.max_depth:
                return
            for operation in self.operation_pool[:self.max_degree]:
                child = self.create_child_node_enhanced(node, operation)
                if child:
                    node.children.append(child)
                    expand(child, depth + 1)

        expand(root, 0)
        return root

# ===== 主要函數保持不變，僅修改類別調用 =====
def read_text_file(file_path):
    """讀取文本文件"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return file.read()
    except FileNotFoundError:
        return "No file available"
    except Exception as e:
        logger.error(f"讀取文件錯誤: {e}")
        return "Error reading file"

def read_json_file(file_path):
    """讀取JSON文件"""
    try:
        with open(file_path, 'r', encoding='utf-8') as file:
            return json.load(file)
    except UnicodeDecodeError:
        with open(file_path, 'r', encoding='latin1') as file:
            return json.load(file)
    except FileNotFoundError:
        return [
            {"name": "select_column", "description": "選擇特定欄位"},
            {"name": "value_counts", "description": "計算值的頻次"},
            {"name": "groupby", "description": "按欄位分組"},
            {"name": "sort_values", "description": "排序數據"},
            {"name": "filter_rows", "description": "過濾行數據"},
            {"name": "write", "description": "撰寫分析文本"}
        ]

# 此處應包含原有的其他類別定義 (TreeNode, OperationParser, ContentPlanner, SafeDataFrameOperator)
# 為節省空間，這裡只展示主要的修改部分


    
api_key = os.getenv("GOOGLE_API_KEY")


logger.info("Enhanced Tree-of-Report for Sports News Generation")
logger.info("="*60)

logger.info("正在載入數據...")

try:
    TABLES = pd.read_csv('filtered_set1.csv')
    logger.info(f"成功載入CSV: {TABLES.shape[0]} 行, {TABLES.shape[1]} 列")
except FileNotFoundError:
    logger.warning("找不到 filtered_set1.csv，使用示例數據")
    TABLES = pd.DataFrame({
        'type': ['smash', 'clear', 'drop', 'net', 'smash', 'clear', 'drop', 'drive'],
        'lose_reason': ['net', 'out', 'net', 'long', 'net', 'out', 'long', 'net'],
        'getpoint_player': ['Player_A', 'Player_B', 'Player_A', 'Player_B', 'Player_A', 'Player_B', 'Player_A', 'Player_B'],
        'score': [1, 2, 1, 3, 2, 1, 4, 2]
    })

# 使用增強版系統
MAX_DEPTH = 3
MAX_DEGREE = 3

enhanced_tree_report = EnhancedTreeOfReport(api_key, max_depth=MAX_DEPTH, max_degree=MAX_DEGREE)

logger.info("開始建構增強版報告樹...")
start_time = datetime.now()

try:
    # 建構樹（使用原有的build_tree邏輯，但替換關鍵方法）
    root = enhanced_tree_report.build_tree(TABLES)
    
    # 生成增強版文本
    enhanced_tree_report.generate_all_texts_enhanced(root)
    
    # 生成最終報告
    final_report = enhanced_tree_report.generate_final_report_enhanced(root)
    
    logger.info("\n" + "="*60)
    logger.info("ENHANCED TREE-OF-REPORT 最終報告")
    logger.info("="*60)
    print(final_report)
    
    end_time = datetime.now()
    duration = (start_time - end_time).total_seconds()
    
    logger.info(f"增強版報告生成完成，耗時: {duration:.2f} 秒")
    logger.info("生成的文件:")
    logger.info("- enhanced_tree_report.txt: 增強版最終報告")
    logger.info("- generation_quality_report.json: 生成品質分析")
    
except Exception as e:
    logger.error(f"程序執行失敗: {e}")
    import traceback
    traceback.print_exc()


2025-06-06 12:17:50,505 - INFO - Enhanced Tree-of-Report for Sports News Generation
2025-06-06 12:17:50,507 - INFO - 正在載入數據...
2025-06-06 12:17:50,509 - INFO - 成功載入CSV: 315 行, 9 列
2025-06-06 12:17:50,510 - INFO - 載入操作池: ['write', 'select_row', 'select_column', 'group_by', 'aggregate', 'value_counts', 'crosstab', 'pivot_table', 'sort', 'calculate']
2025-06-06 12:17:50,510 - INFO - 開始建構增強版報告樹...
2025-06-06 12:17:50,520 - ERROR - 創建子節點失敗: Object of type int64 is not JSON serializable
2025-06-06 12:17:52,337 - INFO - 操作成功，結果形狀: (315, 9)
2025-06-06 12:17:52,338 - INFO - 創建數據操作節點: select_row, 結果形狀: (315, 9)
2025-06-06 12:17:52,345 - ERROR - 創建子節點失敗: Object of type int64 is not JSON serializable


DataFrame successfully read, processed, and saved to tmp.csv


2025-06-06 12:17:54,946 - INFO - 操作成功，結果形狀: (258, 9)
2025-06-06 12:17:54,946 - INFO - 創建數據操作節點: select_row, 結果形狀: (258, 9)
2025-06-06 12:17:54,954 - ERROR - 創建子節點失敗: Object of type int64 is not JSON serializable


DataFrame 已成功儲存到 tmp.csv


2025-06-06 12:17:56,504 - INFO - 操作成功，結果形狀: (258, 9)
2025-06-06 12:17:56,505 - INFO - 創建數據操作節點: select_row, 結果形狀: (258, 9)


DataFrame操作成功並已保存到 tmp.csv


2025-06-06 12:17:58,560 - INFO - 操作成功，結果形狀: (258, 8)
2025-06-06 12:17:58,561 - INFO - 創建數據操作節點: select_column, 結果形狀: (258, 8)


DataFrame處理完成並已儲存到 tmp.csv


2025-06-06 12:18:00,665 - INFO - 操作成功，結果形狀: (31, 3)
2025-06-06 12:18:00,665 - INFO - 創建數據操作節點: group_by, 結果形狀: (31, 3)


group_by 操作完成，結果已儲存到 tmp.csv


2025-06-06 12:18:02,589 - INFO - 操作成功，結果形狀: (315, 3)
2025-06-06 12:18:02,589 - INFO - 創建數據操作節點: select_column, 結果形狀: (315, 3)


DataFrame successfully processed and saved to tmp.csv


2025-06-06 12:18:04,308 - INFO - 創建 write 節點: write (品質分數: 0.80)
2025-06-06 12:18:07,124 - INFO - 操作成功，結果形狀: (0, 3)
2025-06-06 12:18:07,124 - INFO - 創建數據操作節點: select_row, 結果形狀: (0, 3)


成功保存修改後的DataFrame到 tmp.csv


2025-06-06 12:18:08,541 - INFO - 操作成功，結果形狀: (315, 2)
2025-06-06 12:18:08,542 - INFO - 創建數據操作節點: select_column, 結果形狀: (315, 2)


CSV file processed and saved to tmp.csv


2025-06-06 12:18:10,514 - INFO - 操作成功，結果形狀: (315, 2)
2025-06-06 12:18:10,514 - INFO - 創建數據操作節點: group_by, 結果形狀: (315, 2)


發生錯誤：name 'all' is not defined


2025-06-06 12:18:12,328 - INFO - 操作成功，結果形狀: (31, 3)
2025-06-06 12:18:12,329 - INFO - 創建數據操作節點: group_by, 結果形狀: (31, 3)


group_by 操作完成，結果已儲存至 tmp.csv


2025-06-06 12:18:13,772 - INFO - 創建 write 節點: write (品質分數: 0.85)
2025-06-06 12:18:16,380 - INFO - 操作成功，結果形狀: (0, 3)
2025-06-06 12:18:16,380 - INFO - 創建數據操作節點: select_row, 結果形狀: (0, 3)


DataFrame successfully processed and saved to tmp.csv


2025-06-06 12:18:18,196 - INFO - 操作成功，結果形狀: (31, 2)
2025-06-06 12:18:18,197 - INFO - 創建數據操作節點: select_column, 結果形狀: (31, 2)


欄位選擇完成並已儲存至 tmp.csv


2025-06-06 12:18:20,219 - INFO - 操作成功，結果形狀: (31, 3)
2025-06-06 12:18:20,219 - INFO - 創建數據操作節點: group_by, 結果形狀: (31, 3)


group_by操作完成，結果已儲存到 tmp.csv


2025-06-06 12:18:21,917 - INFO - 操作成功，結果形狀: (315, 2)
2025-06-06 12:18:21,918 - INFO - 創建數據操作節點: select_column, 結果形狀: (315, 2)


選取欄位完成，並已儲存至 tmp.csv


2025-06-06 12:18:22,185 - ERROR - 生成失敗 (嘗試 1/3): 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/generate_content_free_tier_requests"
  quota_id: "GenerateRequestsPerMinutePerProjectPerModel-FreeTier"
  quota_dimensions {
    key: "model"
    value: "gemini-2.0-flash"
  }
  quota_dimensions {
    key: "location"
    value: "global"
  }
  quota_value: 15
}
, links {
  description: "Learn more about Gemini API quotas"
  url: "https://ai.google.dev/gemini-api/docs/rate-limits"
}
, retry_delay {
  seconds: 35
}
]
2025-06-06 12:18:24,431 - ERROR - 生成失敗 (嘗試 2/3): 429 You exceeded your current quota, please check your plan and billing details. For more information on this error, head to: https://ai.google.dev/gemini-api/docs/rate-limits. [violations {
  quota_metric: "generativelanguage.googleapis.com/

In [40]:
import os
import pandas as pd
import numpy as np
import google.generativeai as genai
import time
from datetime import datetime

# === 寫作風格詞彙 ===
BADMINTON_TERMS = {
    'net': '網前失誤', 'out': '出界', 'long': '過底線', 'smash': '殺球',
    'clear': '高遠球', 'drop': '切球', 'drive': '平抽球', 'serve': '發球', 'return': '回球'
}
ACTION_VERBS = ['展現', '發揮', '掌握', '運用', '施展', '控制', '主導', '壓制', '突破', '創造', '締造', '奠定', '確立', '鞏固', '扭轉', '逆轉']
TECHNICAL_TERMS = ['lose_reason', 'getpoint_player', 'type', 'column', 'row']

# === Gemini 模型初始化 ===
def init_model(api_key: str):
    genai.configure(api_key=api_key)
    return genai.GenerativeModel("gemini-2.0-flash")

# === 品質評估 ===
def assess_text_quality(text: str) -> float:
    score = 0.0
    if 30 <= len(text) <= 120:
        score += 0.2
    score += min(0.2, sum(1 for t in BADMINTON_TERMS.values() if t in text) * 0.1)
    score += min(0.2, sum(1 for v in ACTION_VERBS if v in text) * 0.05)
    if not any(t in text for t in TECHNICAL_TERMS):
        score += 0.2
    if '，' in text or '。' in text:
        score += 0.2
    return round(min(score, 1.0), 2)

# === 主流程：重複3次生成並評估 ===
def generate_best_of_three(df: pd.DataFrame, api_key: str):
    model = init_model(api_key)
    table_str = df.to_string(index=False)

    prompt_template = f"""
你是一位專業體育新聞記者，擅長撰寫羽球比賽報導。
請根據以下數據表格撰寫賽事描述，使用繁體中文，避免出現技術欄位名稱。

# 賽事數據表格：
{table_str}

請撰寫描述：
"""

    results = []
    for i in range(3):
        try:
            print(f"⏳ 第 {i+1}/3 次生成...")
            response = model.generate_content(prompt_template)
            time.sleep(1)
            text = response.text.strip() if response.text else "⚠️ 無內容"
        except Exception as e:
            text = f"⚠️ 生成錯誤: {e}"
        score = assess_text_quality(text)
        results.append({'index': i+1, 'text': text, 'score': score})

    # 選出最佳結果
    best = max(results, key=lambda x: x['score'])

    # 輸出到檔案
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    file_name = f"best_of_three_report_{timestamp}.txt"
    with open(file_name, "w", encoding="utf-8") as f:
        for r in results:
            f.write(f"[版本 {r['index']}] 品質分數: {r['score']}\n{r['text']}\n\n")
        f.write(f"🏆 最佳版本為第 {best['index']} 次，分數: {best['score']}\n")
        f.write(best['text'])

    print("\n✅ 所有版本已生成")
    for r in results:
        print(f"[{r['index']}] 分數: {r['score']} → {r['text']}")
    print(f"\n🏆 最佳版本是第 {best['index']} 次：{best['text']}")
    print(f"✔️ 已儲存至：{file_name}")
    return best

# === 測試入口 ===
if __name__ == "__main__":
    api_key = os.getenv("GOOGLE_API_KEY")
    if not api_key:
        raise RuntimeError("請設置 GOOGLE_API_KEY 環境變數")

    try:
        df = pd.read_csv("filtered_set1.csv")
    except FileNotFoundError:
        df = pd.DataFrame({
            'type': ['smash', 'clear', 'drop', 'net', 'smash', 'clear', 'drop', 'drive'],
            'lose_reason': ['net', 'out', 'net', 'long', 'net', 'out', 'long', 'net'],
            'getpoint_player': ['Player_A', 'Player_B', 'Player_A', 'Player_B', 'Player_A', 'Player_B', 'Player_A', 'Player_B'],
            'score': [1, 2, 1, 3, 2, 1, 4, 2]
        })

    generate_best_of_three(df, api_key)


⏳ 第 1/3 次生成...
⏳ 第 2/3 次生成...
⏳ 第 3/3 次生成...

✅ 所有版本已生成
[1] 分數: 0.7 → 這場羽球對決可謂高潮迭起，雙方選手你來我往，互不相讓。從一開始的試探性發球與輕巧過網球，到隨後逐漸加劇的攻防轉換，長球與短球的交錯使用，展現了兩位選手全面的技術能力。

第一分就經歷了多拍來回，雙方在網前短球與後場高遠球之間不斷轉換，最終A選手把握機會，一記精準的落點讓對手措手不及，先下一城。

隨後，雙方比分交替上升。A選手擅長利用落點刁鑽的切球來調動對手，而B選手則以強勁的扣殺作為回應。比賽中，多次出現多拍相持，雙方選手都展現了極佳的體能與控制能力。精采的攻防轉換讓觀眾目不暇給。

比賽過程中，雙方也出現了一些失誤，包括回球出界、觸網等，但整體而言，兩人表現出的競技水準仍然相當高。特別是後段，雙方體力消耗巨大，但依然堅持快速的攻防轉換，比分也因此呈現膠著狀態。最終，A選手以21:15的比分艱難勝出，贏得了這場激烈的比賽。
[2] 分數: 0.7 → 這場羽球賽可謂高潮迭起，雙方選手你來我往，毫不相讓。比賽伊始，雙方就展開了激烈的攻防，多次出現多拍來回的精彩場面。一方選手擅長運用多樣化的球路，包括輕巧的網前小球和角度刁鑽的斜線進攻，試圖調動對手；另一方則以強勁的後場扣殺和穩健的防守反擊見長。

比賽過程中，雙方比分交替上升，互不相讓。領先優勢多次易手，每一次得分都伴隨著場邊觀眾的歡呼和掌聲。選手們在場上奮力奔跑，每一次擊球都凝聚著力量和技巧，力求將球打到對方難以觸及的位置。

在關鍵分上，雙方都展現出了極高的心理素質。一方選手利用一次精準的判斷，迫使對手回球出界，成功拿下關鍵一分。然而，另一方選手也毫不示弱，隨後利用一記勢大力沉的扣殺，直接得分，將比分追平。

比賽最後階段，雙方體能都已接近極限，但依舊堅持著。最終，在一連串的精彩攻防後，一方選手抓住機會，利用對手的一個失誤，成功拿下制勝分，贏得了這場艱苦的比賽。整場比賽節奏緊湊，雙方實力接近，為觀眾奉獻了一場精彩絕倫的羽球盛宴。
[3] 分數: 0.75 → 這場羽球賽事可謂高潮迭起，雙方你來我往，攻防轉換迅速。從一開始的比分膠著，到後面的逐漸拉鋸，每一球都充滿了競爭。

首局開始，雙方都相當謹慎，試圖通過發球和過渡球來控制節奏。A選手率先取得領先，但B選手緊追不捨，多次通過