In [5]:
import pandas as pd
import sys
import os
import json
from tqdm import tqdm
sys.path.append('/Users/yijingyang/Library/CloudStorage/OneDrive-个人/GradPilot/ProgramDB/DataScience')
import asyncio
from call_api import call_gemini, async_call_gemini
from tqdm.asyncio import tqdm_asyncio

field_name = "职业项目"
field_path = f"/Users/yijingyang/Library/CloudStorage/OneDrive-个人/GradPilot/ProgramDB/DataScience/fields_csv/{field_name}.csv"
field_df = pd.read_csv(field_path)

field_df.shape

(351, 9)

In [5]:
prompt_template = """
You are an assistant who must classify the graduate program below as either **“professional”** (career-focused) or **“non-professional”** (academic / research-oriented).

────────────────────────────────────────────────────────────────────────
📌  What defines a *professional* program?
• **Skill-centric curriculum** – Courses concentrate on concrete industry tools and competencies (e.g., product management, data visualization, enterprise risk, UX design).  
• **Limited research emphasis** – Little or no mention of faculty-led research labs, theses, or publications; instead you see capstones, practicums, or internships.  
• **Flexible delivery** – Frequently online or hybrid, offered evenings/weekends, with multiple or rolling start dates (not just Fall) and variable part-time / full-time durations (≈ 9-24 months).  
• **Career-first language** – Website text highlights phrases like “advance your career,” “taught by industry experts,” “immediate ROI,” “professional network,” “salary growth,” etc.

🔍  Example programs (for context; do **not** classify these):
1. Columbia University – M.S. Enterprise Risk Management (School of Professional Studies)  
2. Harvard Extension School – A.L.M. in Management  
3. NYU School of Professional Studies – M.S. Integrated Marketing  
4. Johns Hopkins Engineering for Professionals – M.S. Cybersecurity  

Unless the website clearly meets the criteria above, label the program **non-professional**.  
────────────────────────────────────────────────────────────────────────

⚠️  **Output format**  
Return **exactly one line**: either  
``professional``  
or  
``non-professional``  
No additional words, punctuation, or explanations.

Links to consult:  
• Admissions URL: {admissions_url}  
• Program URL: {program_url}

Is this program a professional program?
"""

In [10]:
import os
import json
import asyncio
from tqdm.asyncio import tqdm_asyncio

# Async Gemini wrapper
from call_api import async_call_gemini

# ---------------------------------------------------------------------------
# Concurrency guard – avoid hitting rate-limits
# ---------------------------------------------------------------------------
semaphore = asyncio.Semaphore(2)            # max concurrent rows

# ---------------------------------------------------------------------------
# Per-row worker
# ---------------------------------------------------------------------------
async def process_row(row, prompt_template, num_vote: int, model_name: str):
    """
    1. Format the prompt for this row
    2. Launch `num_vote` Gemini calls in parallel
    3. Capture BOTH normal answers *and* every possible error case
    4. Return a serialisable record
    """
    async with semaphore:
        row    = row.to_dict()
        prompt = prompt_template.format(
            university     = row["大学英文名称"],
            degree         = row["学位"],
            program        = row["专业英文名称"],
            department     = row["所属院系"],
            admissions_url = row["招生网址"],
            program_url    = row["专业网址"],
        )

        record: dict = row.copy()
        record["llm_reponses"] = {}

        # -------- launch Gemini calls in parallel --------------------
        tasks = [
            async_call_gemini(
                prompt,
                model_name=model_name,
                use_search=True,
                url_context=True
            )
            for _ in range(num_vote)
        ]
        responses = await asyncio.gather(*tasks)

        # -------- post-process each response -------------------------
        for i, response in enumerate(responses):
            resp_key = f"response {i+1}"

            # -- 1. Transport / server-side errors (string starting "Error:")
            if isinstance(response, str) and response.startswith("Error:"):
                record["llm_reponses"][resp_key] = {
                    "error": response                       # e.g. "Error: 429 Rate limit …"
                }
                continue

            # -- 2. Empty / malformed response objects
            if not hasattr(response, "candidates") or not response.candidates:
                record["llm_reponses"][resp_key] = {
                    "error": "No candidates returned",
                    "raw_response": str(response)
                }
                continue

            # -- 3. Extract main answer text
            try:
                text = response.candidates[0].content.parts[0].text
            except Exception as e:
                record["llm_reponses"][resp_key] = {
                    "error": f"Cannot parse text: {e}",
                    "raw_response": str(response)
                }
                continue

            # -- 4. Extract additional metadata (best-effort)
            try:
                url_context = str(response.candidates[0].url_context_metadata)
            except Exception:
                url_context = "Not used"

            try:
                search_pages = (
                    f"Search Chunks: "
                    f"{response.candidates[0].grounding_metadata.grounding_chunks}"
                )
            except Exception:
                search_pages = "Not used"

            try:
                search_queries = (
                    f"Search Query: "
                    f"{response.candidates[0].grounding_metadata.web_search_queries}"
                )
            except Exception:
                search_queries = "Not used"

            try:
                search_support = (
                    f"Search Supports: "
                    f"{response.candidates[0].grounding_metadata.groundingSupports}"
                )
            except Exception:
                search_support = "Not used"

            # -- 5. Store normal answer + metadata + raw object
            record["llm_reponses"][resp_key] = {
                "response_text": text,
                "url_context": url_context,
                "search_queries": search_queries,
                "search_pages": search_pages,
                "search_support": search_support,
                "raw_response": str(response)             # keep for deep-debugging
            }

        return record

# ---------------------------------------------------------------------------
# Batch orchestrator with tqdm progress bar
# ---------------------------------------------------------------------------
async def request_and_store_async(prompt_template,
                                  field_df,
                                  num_vote: int,
                                  model_name: str,
                                  start_from: int = 0,
                                  end_at: int = -1):
    """
    Runs `process_row` over the dataframe slice asynchronously,
    shows a live tqdm bar, and dumps the results to JSON.
    """
    df = field_df.copy()[start_from:end_at]

    # Spawn tasks for every row in the slice
    tasks = [
        process_row(row, prompt_template, num_vote, model_name)
        for _, row in df.iterrows()
    ]

    # tqdm_asyncio.gather gives us progress updates as tasks complete
    response_records = await tqdm_asyncio.gather(*tasks)

    # Persist to disk ------------------------------------------------
    output_dir = f"../fields_records/{field_name}"
    os.makedirs(output_dir, exist_ok=True)
    output_path = f"{output_dir}/{field_name}_{model_name}_{start_from}_{end_at}.json"

    with open(output_path, "w") as f:
        json.dump(response_records, f, ensure_ascii=False, indent=2)

    return response_records

In [11]:
import nest_asyncio
nest_asyncio.apply()  # Only needed in Jupyter

num_vote = 3
start_from = 0
end_at = -1
model_name = "gemini-2.5-pro"
response_records = asyncio.run(
    request_and_store_async(prompt_template, field_df, num_vote, model_name, start_from=start_from, end_at=end_at)
)

100%|██████████| 350/350 [1:22:28<00:00, 14.14s/it]    


In [8]:
import json
import asyncio
from call_api import async_call_gemini
from tqdm.asyncio import tqdm_asyncio

async def fill_invalid_responses(json_file_path, prompt_template, validation_func, 
                                model_name="gemini-2.5-flash", max_retries=2):
    """
    检查JSON文件中的无效回答并重新请求，直到获得合法回答
    
    Args:
        json_file_path (str): JSON文件路径
        prompt_template (str): 提示词模板
        validation_func (function): 验证函数，返回(is_valid, status)
        model_name (str): 模型名称
        max_retries (int): 最大重试次数
    
    Returns:
        dict: 处理统计信息
    """
    
    def is_valid_professional_response(response_text):
        """验证职业项目分类回答是否合法"""
        if not response_text or response_text.strip() == "":
            return False, "空白回答"
        
        response_clean = response_text.strip().lower()
        
        if len(response_text) > 100:
            return False, "回答过长"
        
        if response_clean in ["professional", "non-professional"]:
            return True, "有效回答"
        
        return False, "无效分类"
    
    async def get_new_response(prompt):
        """获取新的回答"""
        try:
            response = await async_call_gemini(
                prompt,
                model_name=model_name,
                use_search=True,
                url_context=True
            )
            
            # 处理错误响应
            if isinstance(response, str) and response.startswith("Error:"):
                return None, f"API错误: {response}"
            
            # 处理空响应
            if not hasattr(response, "candidates") or not response.candidates:
                return None, "无candidates"
            
            # 提取回答文本
            try:
                text = response.candidates[0].content.parts[0].text
                return text, "成功获取"
            except Exception as e:
                return None, f"解析失败: {e}"
                
        except Exception as e:
            return None, f"请求异常: {e}"
    
    # 读取JSON文件
    print(f"读取文件: {json_file_path}")
    with open(json_file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    print(f"总项目数: {len(data)}")
    
    # 统计信息
    stats = {
        'total_projects': len(data),
        'total_responses': 0,
        'invalid_responses': 0,
        'fixed_responses': 0,
        'failed_fixes': 0,
        'retry_stats': {}
    }
    
    # 收集所有需要修复的项目
    projects_to_fix = []
    
    for i, project in enumerate(data):
        llm_responses = project.get('llm_reponses', {})
        
        for resp_key, resp_data in llm_responses.items():
            stats['total_responses'] += 1
            
            # 检查是否存在response_text字段
            if 'response_text' not in resp_data:
                # 如果没有response_text，可能是错误响应，标记为需要修复
                stats['invalid_responses'] += 1
                projects_to_fix.append((i, resp_key, project))
                continue
            
            # 验证回答有效性
            response_text = resp_data.get('response_text', '')
            is_valid, status = validation_func(response_text)
            
            if not is_valid:
                stats['invalid_responses'] += 1
                projects_to_fix.append((i, resp_key, project))
    
    print(f"发现无效回答: {stats['invalid_responses']} 个")
    
    if not projects_to_fix:
        print("所有回答都是有效的，无需修复")
        return stats
    
    # 限制并发数
    semaphore = asyncio.Semaphore(2)
    
    async def fix_single_response(project_index, resp_key, project):
        """修复单个回答"""
        async with semaphore:
            # 构建提示词
            prompt = prompt_template.format(
                university=project["大学英文名称"],
                degree=project["学位"],
                program=project["专业英文名称"],
                department=project["所属院系"],
                admissions_url=project["招生网址"],
                program_url=project["专业网址"],
            )
            
            for retry_count in range(1, max_retries + 2):  # +1 因为从1开始计数
                new_text, status = await get_new_response(prompt)
                
                if new_text is None:
                    if retry_count <= max_retries:
                        await asyncio.sleep(1)  # 短暂延迟
                        continue
                    else:
                        # 最终失败
                        return project_index, resp_key, None, f"修复失败: {status}", retry_count
                
                # 验证新回答
                is_valid, validation_status = validation_func(new_text)
                
                if is_valid:
                    # 成功获得有效回答
                    return project_index, resp_key, new_text, f"修复成功(第{retry_count}次尝试)", retry_count
                else:
                    if retry_count <= max_retries:
                        await asyncio.sleep(1)
                        continue
                    else:
                        # 重试次数用完但仍无效
                        return project_index, resp_key, new_text, f"修复失败: {validation_status}", retry_count
            
            return project_index, resp_key, None, "意外错误", max_retries + 1
    
    # 并行修复所有无效回答
    print(f"开始修复 {len(projects_to_fix)} 个无效回答...")
    
    tasks = [
        fix_single_response(project_index, resp_key, project)
        for project_index, resp_key, project in projects_to_fix
    ]
    
    results = await tqdm_asyncio.gather(*tasks)
    
    # 更新数据和统计
    for project_index, resp_key, new_text, status, retry_count in results:
        # 更新重试统计
        if retry_count not in stats['retry_stats']:
            stats['retry_stats'][retry_count] = 0
        stats['retry_stats'][retry_count] += 1
        
        if new_text is not None and "修复成功" in status:
            # 更新原始数据
            data[project_index]['llm_reponses'][resp_key]['response_text'] = new_text
            data[project_index]['llm_reponses'][resp_key]['fix_status'] = status
            data[project_index]['llm_reponses'][resp_key]['fix_attempts'] = retry_count
            stats['fixed_responses'] += 1
        else:
            # 修复失败，标记状态
            data[project_index]['llm_reponses'][resp_key]['fix_status'] = status
            data[project_index]['llm_reponses'][resp_key]['fix_attempts'] = retry_count
            stats['failed_fixes'] += 1
    
    # 保存修复后的文件
    output_path = json_file_path.replace('.json', '_fixed.json')
    with open(output_path, 'w', encoding='utf-8') as f:
        json.dump(data, f, ensure_ascii=False, indent=2)
    
    # 打印统计信息
    print(f"\n修复完成!")
    print(f"总回答数: {stats['total_responses']}")
    print(f"无效回答数: {stats['invalid_responses']}")
    print(f"成功修复: {stats['fixed_responses']}")
    print(f"修复失败: {stats['failed_fixes']}")
    print(f"修复成功率: {stats['fixed_responses']/stats['invalid_responses']*100:.1f}%")
    
    print(f"\n重试统计:")
    for attempts, count in stats['retry_stats'].items():
        print(f"第{attempts}次尝试: {count} 个回答")
    
    print(f"\n修复后的文件已保存到: {output_path}")
    
    return stats

# 使用示例 - 职业项目分类
async def fix_professional_classification():
    """修复职业项目分类的无效回答"""
    
    def is_valid_professional_response(response_text):
        """验证职业项目分类回答是否合法"""
        if not response_text or response_text.strip() == "":
            return False, "空白回答"
        
        response_clean = response_text.strip().lower()
        
        if len(response_text) > 100:
            return False, "回答过长"
        
        if response_clean in ["professional", "non-professional"]:
            return True, "有效回答"
        
        return False, "无效分类"
    
    prompt_template = """
You are an assistant who must classify the graduate program below as either **"professional"** (career-focused) or **"non-professional"** (academic / research-oriented).

────────────────────────────────────────────────────────────────────────
📌  What defines a *professional* program?
• **Skill-centric curriculum** – Courses concentrate on concrete industry tools and competencies (e.g., product management, data visualization, enterprise risk, UX design).  
• **Limited research emphasis** – Little or no mention of faculty-led research labs, theses, or publications; instead you see capstones, practicums, or internships.  
• **Flexible delivery** – Frequently online or hybrid, offered evenings/weekends, with multiple or rolling start dates (not just Fall) and variable part-time / full-time durations (≈ 9-24 months).  
• **Career-first language** – Website text highlights phrases like "advance your career," "taught by industry experts," "immediate ROI," "professional network," "salary growth," etc.

🔍  Example programs (for context; do **not** classify these):
1. Columbia University – M.S. Enterprise Risk Management (School of Professional Studies)  
2. Harvard Extension School – A.L.M. in Management  
3. NYU School of Professional Studies – M.S. Integrated Marketing  
4. Johns Hopkins Engineering for Professionals – M.S. Cybersecurity  

Unless the website clearly meets the criteria above, label the program **non-professional**.  
────────────────────────────────────────────────────────────────────────

⚠️  **Output format**  
Return **exactly one line**: either  
``professional``  
or  
``non-professional``  
No additional words, punctuation, or explanations.

Links to consult:  
• Admissions URL: {admissions_url}  
• Program URL: {program_url}

Is this program a professional program?
"""

    json_file_path = "/Users/yijingyang/Library/CloudStorage/OneDrive-个人/GradPilot/ProgramDB/DataScience/fields_records/职业项目/职业项目_gemini-2.5-pro_0_-1.json"
    
    stats = await fill_invalid_responses(
        json_file_path=json_file_path,
        prompt_template=prompt_template,
        validation_func=is_valid_professional_response,
        model_name="gemini-2.5-flash",
        max_retries=2
    )
    
    return stats

# 运行修复
import nest_asyncio
nest_asyncio.apply()

# 执行修复
stats = asyncio.run(fix_professional_classification())

读取文件: /Users/yijingyang/Library/CloudStorage/OneDrive-个人/GradPilot/ProgramDB/DataScience/fields_records/职业项目/职业项目_gemini-2.5-pro_0_-1.json
总项目数: 350
发现无效回答: 361 个
开始修复 361 个无效回答...


  0%|          | 0/361 [00:00<?, ?it/s]

100%|██████████| 361/361 [43:14<00:00,  7.19s/it]


修复完成!
总回答数: 1050
无效回答数: 361
成功修复: 357
修复失败: 4
修复成功率: 98.9%

重试统计:
第1次尝试: 320 个回答
第2次尝试: 33 个回答
第3次尝试: 8 个回答

修复后的文件已保存到: /Users/yijingyang/Library/CloudStorage/OneDrive-个人/GradPilot/ProgramDB/DataScience/fields_records/职业项目/职业项目_gemini-2.5-pro_0_-1_fixed.json





In [11]:
import json
import pandas as pd
from collections import Counter

def process_professional_classification(json_file_path, output_csv_path):
    """
    Process professional/non-professional classification from JSON file with majority voting.
    
    Args:
        json_file_path (str): Path to the JSON file containing professional classification data
        output_csv_path (str): Path to save the output CSV file
    
    Returns:
        pd.DataFrame: DataFrame with classification results and statistics
    """
    
    def is_valid_response(response_text):
        """Check if a response is valid (professional or non-professional)"""
        if not response_text or response_text.strip() == "":
            return False, "空白回答"
        
        response_clean = response_text.strip().lower()
        
        # Check for overly long responses (likely explanatory text rather than classification)
        if len(response_text) > 100:
            return False, "回答过长"
        
        # Check if response is exactly one of the two valid classifications
        if response_clean in ["professional", "non-professional"]:
            return True, "有效回答"
        
        return False, "无效分类"
    
    # Load JSON data
    with open(json_file_path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    
    results = []
    stats = {
        'total_programs': 0,
        'unanimous_professional': 0,
        'unanimous_non_professional': 0,
        'majority_professional': 0,
        'majority_non_professional': 0,
        'tied_votes': 0,
        'all_invalid': 0,
        'total_valid_responses': 0,
        'total_invalid_responses': 0
    }
    
    for program in data:
        stats['total_programs'] += 1
        
        # Extract basic info
        university = program.get('大学英文名称', '')
        degree = program.get('学位', '')
        major = program.get('专业英文名称', '')
        school = program.get('所属院系', '')
        
        # Extract responses
        llm_responses = program.get('llm_reponses', {})
        
        # Process each response
        response_data = {}
        valid_responses = []
        
        for response_num in range(1, 4):
            response_key = f"response {response_num}"
            response_text = ""
            
            if response_key in llm_responses:
                response_text = llm_responses[response_key].get('response_text', '')
            
            # Validate response
            is_valid, status = is_valid_response(response_text)
            
            # Store response details
            response_data[f'response_{response_num}_text'] = response_text
            response_data[f'response_{response_num}_status'] = status
            response_data[f'response_{response_num}_valid'] = is_valid
            
            # Add to valid responses if valid
            if is_valid:
                valid_responses.append(response_text.strip().lower())
        
        # Count valid and invalid responses
        valid_count = len(valid_responses)
        invalid_count = 3 - valid_count
        stats['total_valid_responses'] += valid_count
        stats['total_invalid_responses'] += invalid_count
        
        # Determine final classification
        if valid_count == 0:
            final_classification = "需要额外确认"
            voting_status = "所有回答均无效"
            stats['all_invalid'] += 1
        else:
            # Count votes
            vote_counts = Counter(valid_responses)
            professional_votes = vote_counts.get('professional', 0)
            non_professional_votes = vote_counts.get('non-professional', 0)
            
            if professional_votes == non_professional_votes:
                # Tie
                final_classification = "需要额外确认"
                voting_status = f"票数相等({professional_votes}票professional vs {non_professional_votes}票non-professional)"
                stats['tied_votes'] += 1
            elif professional_votes > non_professional_votes:
                final_classification = 'professional'
                if valid_count == professional_votes:  # All valid votes are professional
                    voting_status = f"一致投票({valid_count}/3有效)"
                    stats['unanimous_professional'] += 1
                else:
                    voting_status = f"多数票({professional_votes}票professional vs {non_professional_votes}票non-professional)"
                    stats['majority_professional'] += 1
            else:  # non_professional_votes > professional_votes
                final_classification = 'non-professional'
                if valid_count == non_professional_votes:  # All valid votes are non-professional
                    voting_status = f"一致投票({valid_count}/3有效)"
                    stats['unanimous_non_professional'] += 1
                else:
                    voting_status = f"多数票({non_professional_votes}票non-professional vs {professional_votes}票professional)"
                    stats['majority_non_professional'] += 1
        
        # Create result record
        result_record = {
            '大学英文名称': university,
            '学位': degree,
            '专业英文名称': major,
            '所属院系': school,
            '职业项目': final_classification,
            '投票状态': voting_status
        }
        
        # Add response details
        result_record.update(response_data)
        
        results.append(result_record)
    
    # Create DataFrame
    df = pd.DataFrame(results)
    
    # Save to CSV
    df.to_csv(output_csv_path, index=False, encoding='utf-8-sig')
    
    # Print statistics
    print("职业项目分类统计:")
    print(f"总项目数: {stats['total_programs']}")
    print(f"一致认为是professional: {stats['unanimous_professional']} ({stats['unanimous_professional']/stats['total_programs']*100:.1f}%)")
    print(f"一致认为是non-professional: {stats['unanimous_non_professional']} ({stats['unanimous_non_professional']/stats['total_programs']*100:.1f}%)")
    print(f"多数票professional: {stats['majority_professional']} ({stats['majority_professional']/stats['total_programs']*100:.1f}%)")
    print(f"多数票non-professional: {stats['majority_non_professional']} ({stats['majority_non_professional']/stats['total_programs']*100:.1f}%)")
    print(f"票数相等: {stats['tied_votes']} ({stats['tied_votes']/stats['total_programs']*100:.1f}%)")
    print(f"所有回答均无效: {stats['all_invalid']} ({stats['all_invalid']/stats['total_programs']*100:.1f}%)")
    print(f"总有效回答数: {stats['total_valid_responses']}")
    print(f"总无效回答数: {stats['total_invalid_responses']}")
    
    if stats['total_programs'] > 0:
        print(f"平均每项目有效回答数: {stats['total_valid_responses']/stats['total_programs']:.2f}")
    
    return df, stats

# Usage:
df, stats = process_professional_classification(
    '/Users/yijingyang/Library/CloudStorage/OneDrive-个人/GradPilot/ProgramDB/DataScience/fields_records/职业项目/职业项目_gemini-2.5-pro_0_-1_fixed.json',
    '/Users/yijingyang/Library/CloudStorage/OneDrive-个人/GradPilot/ProgramDB/DataScience/fields_records/职业项目.csv'
)

职业项目分类统计:
总项目数: 350
一致认为是professional: 170 (48.6%)
一致认为是non-professional: 130 (37.1%)
多数票professional: 25 (7.1%)
多数票non-professional: 25 (7.1%)
票数相等: 0 (0.0%)
所有回答均无效: 0 (0.0%)
总有效回答数: 1046
总无效回答数: 4
平均每项目有效回答数: 2.99
