# 步骤1: 爬虫问卷库

In [101]:
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import re

In [102]:
# ============================================================================
# 步骤1: 爬虫问卷库管理
# 负责：接收和存储从各旅行平台爬取的问题数据
# ============================================================================

@dataclass
class TravelQuestion:
    """
    旅行问题数据类
    用于存储从各平台爬取的问题数据
    """
    question_id: str      # 问题唯一标识符
    text: str            # 问题文本内容
    question_type: str   # 问题类型：单选、多选、开放性问题
    options: List[str]   # 选项列表（选择题才有）
    source_platform: str # 数据来源平台
    original_category: str  # 原始分类（爬虫初步分类）

class CrawlerQuestionBank:
    """
    爬虫问题库管理类
    主要功能：存储和管理从各平台爬取的问题数据
    """
   
    def __init__(self):
        # 使用字典存储所有问题，key是question_id，value是TravelQuestion对象
        self.questions = {}
        # 记录所有数据来源平台
        self.platforms = set()
    
    def add_crawled_questions(self, crawled_data: List[Dict]) -> Dict[str, Any]:
        """
        批量添加爬虫获取的问题数据
        输入：爬虫组提供的原始数据列表
        输出：处理结果统计（成功数量、错误信息等）
        """
        # 待实现：数据验证、去重、存储逻辑
        # 这里需要检查数据格式，去除重复问题，生成唯一question_id
        results = {'added_count': 0, 'errors': []}
        return results
    
    def get_questions_by_platform(self, platform: str) -> List[TravelQuestion]:
        """
        根据平台名称筛选问题
        例如：获取所有来自Booking的问题
        """
        # 待实现：平台筛选逻辑
        return []
    
    def get_question_count_by_platform(self) -> Dict[str, int]:
        """
        统计各平台的问题数量
        输出如：{'Ctrip': 50, 'Booking.com': 30, ...}
        """
        # 待实现：平台统计逻辑
        return {}

# 步骤2：数据集成和预处理 

In [103]:
# ============================================================================
# 步骤2: 数据集成和预处理
# 负责：清洗和标准化爬取的原始数据
# ============================================================================

class DataIntegrationPreprocessor:
    """
    数据集成和预处理类
    主要功能：对原始数据进行清洗、分类、复杂度评估
    """

    # 1. 旅行领域关键词词典，用于自动分类问题，需根据实际问卷内容继续完善
    def __init__(self):
        self.travel_keywords = {
            '住宿': 'accommodation', '酒店': 'accommodation', '民宿': 'accommodation',
            '交通': 'transportation', '航班': 'transportation', '机票': 'transportation',
            '火车': 'transportation', '地铁': 'transportation', '租车': 'transportation',
            '美食': 'food', '餐厅': 'food', '小吃': 'food', '早餐': 'food',
            '景点': 'attraction', '门票': 'attraction', '博物馆': 'attraction',
            '签证': 'visa', '护照': 'visa', '入境': 'visa', '证件': 'visa',
            '预算': 'budget', '价格': 'budget', '费用': 'budget', '花费': 'budget'
        }
    
    # 2. 数据集成和清洗的主要方法
    # 输入：原始问题数据
    # 输出：清洗后的结构化数据列表
    def integrate_and_clean(self, raw_questions: List[Any]) -> List[Dict[str, Any]]:
        processed_questions_map = {}  # 用字典存储，key是问题文本，value是处理后的question
        text_frequency = {}  # 单一问题出现频率
        
        for i, raw_question in enumerate(raw_questions):
            try:
                # 1. 提取问题文本
                question_text = self._extract_question_text(raw_question)
                if not question_text:  # 跳过空文本
                    continue
                
                # 2. 文本清洗
                cleaned_text = self._clean_text(question_text)
                if not cleaned_text:  # 跳过清洗后为空的文本
                    continue
                
                # 3. 统计频率（无论是否重复都统计）
                text_frequency[cleaned_text] = text_frequency.get(cleaned_text, 0) + 1
                
                # 4. 如果这个问题已经处理过，跳过重复处理
                if cleaned_text in processed_questions_map:
                    continue
                
                # 5. 提取和标准化选项
                raw_options = self._extract_options(raw_question)
                standardized_options = self._standardize_options(raw_options)
                
                # 6. 确定问题类型（根据选项数量）
                question_type = self._determine_question_type(standardized_options)
                
                # 7. 自动分类和复杂度评估
                category = self.categorize_question(cleaned_text)
                complexity = self.calculate_complexity(cleaned_text, standardized_options, question_type)
                
                # 8. 构建结构化输出（包含频率信息）
                processed_question = {
                    'question_id': getattr(raw_question, 'question_id', f'Q{i:04d}'),#问题id
                    'text': cleaned_text,#简单处理字符
                    'options': standardized_options,#提取选项
                    'question_type': question_type,#问题类型，如单选、多选、开放性问题
                    'category': category,#自动分类结果，如住宿、交通
                    'complexity': complexity,#基于：文本长度、选项数量、问题类型、关键词的问题复杂度
                    'frequency': text_frequency[cleaned_text],#清洗后问题出现次数 
                    'source_platform': getattr(raw_question, 'source_platform', 'unknown'),
                    'original_category': getattr(raw_question, 'original_category', 'unknown')
                }
                
                # 9. 存储到字典中（key是问题文本）
                processed_questions_map[cleaned_text] = processed_question
                
            except Exception as e:
                # 跳过处理失败的问题
                continue
        
        # 返回所有唯一问题的列表
        return list(processed_questions_map.values())
    
    # 3. 频率统计
    def get_question_frequency_report(self, processed_questions: List[Dict]) -> Dict[str, Any]:
        """生成问题频率统计报告"""
        if not processed_questions:
            return {}
        
        frequency_report = {
            'total_questions': len(processed_questions),
            'unique_questions': len(set(q['text'] for q in processed_questions)),
            'top_questions': [],
            'frequency_distribution': {}  # 频率分布：出现1次的问题有多少，2次的有多少...
        }
        
        # 统计每个问题的频率（从processed_questions中获取）
        text_frequency = {}
        for question in processed_questions:
            text = question['text']
            frequency = question.get('frequency', 1)  # 获取频率，如果没有则默认为1
            text_frequency[text] = frequency
        
        # 找出最高频的问题（按频率排序）
        sorted_frequency = sorted(text_frequency.items(), key=lambda x: x[1], reverse=True)
        frequency_report['top_questions'] = [
            {'question': text, 'frequency': freq} 
            for text, freq in sorted_frequency[:10]  # 前10个最高频问题
        ]
        
        # 统计频率分布
        freq_dist = {}
        for count in text_frequency.values():
            freq_dist[count] = freq_dist.get(count, 0) + 1
        frequency_report['frequency_distribution'] = freq_dist
        
        return frequency_report

    # 4.函数支持  
    def _extract_question_text(self, raw_question: Any) -> str:
        """从原始数据中提取问题文本"""
        if hasattr(raw_question, 'text'):
            return raw_question.text
        elif isinstance(raw_question, dict) and 'text' in raw_question:
            return raw_question['text']
        return ""
    
    def _clean_text(self, text: str) -> str:
        """清理问题文本：移除HTML标签、特殊字符、多余空格"""
        if not text: 
            return ""
        # 移除HTML标签
        cleaned = re.sub(r'<[^>]+>', '', text)
        # 只保留中文、英文、数字和基本标点
        cleaned = re.sub(r'[^\w\u4e00-\u9fff\s\.\?\!]', ' ', cleaned)
        # 合并多个空格为单个空格
        cleaned = re.sub(r'\s+', ' ', cleaned).strip()
        return cleaned
    
    def _extract_options(self, raw_question: Any) -> List[str]:
        """从原始数据中提取选项"""
        if hasattr(raw_question, 'options'):
            return raw_question.options
        elif isinstance(raw_question, dict) and 'options' in raw_question:
            return raw_question['options']
        return []
    
    def _standardize_options(self, options: List[str]) -> List[str]:
        """标准化选项格式：清理空白、移除编号前缀"""
        if not options: 
            return ["是", "否"]  # 默认选项
        
        standardized = []
        for option in options:
            if not option: 
                continue
            # 移除"A."、"1)"等编号前缀
            cleaned = re.sub(r'^[A-Za-z0-9][\.、\)）]?\s*', '', option.strip())
            # 清理空格
            cleaned = re.sub(r'\s+', ' ', cleaned).strip()
            if cleaned: 
                standardized.append(cleaned)
        
        return standardized if standardized else ["是", "否"]
    
    def _determine_question_type(self, options: List[str]) -> str:
        """根据选项确定问题类型"""
        count = len(options)
        if count == 0: 
            return "open_ended"  # 开放性问题
        elif count == 2 and any(opt in ["是", "否", "有", "没有"] for opt in options): 
            return "yes_no"  # 是否问题
        elif count <= 4: 
            return "single_choice"  # 单选题
        else: 
            return "multiple_choice"  # 多选题
    
    def calculate_complexity(self, text: str, options: List[str], question_type: str) -> str:
        """
        计算问题复杂度
        基于：文本长度、选项数量、问题类型、关键词难度
        """
        score = 0
        
        # 文本长度评分
        if len(text) > 60: 
            score += 3  # 长文本，复杂度高
        elif len(text) > 40: 
            score += 2  # 中等文本
        elif len(text) > 20: 
            score += 1  # 短文本
        
        # 选项数量评分
        if len(options) > 6: 
            score += 2  # 选项多，复杂度高
        elif len(options) > 4: 
            score += 1  # 选项中等
        
        # 问题类型评分
        if question_type == "open_ended": 
            score += 2  # 开放性问题较难
        elif question_type == "multiple_choice": 
            score += 1  # 多选题中等
        
        # 关键词难度评分
        hard_words = ['签证', '政策', '规划', '预算', '比较', '为什么']
        if any(word in text for word in hard_words): 
            score += 2  # 包含复杂概念
        
        # 最终分类
        if score >= 6: 
            return "hard"
        elif score >= 3: 
            return "medium"
        else: 
            return "easy"
    
    def categorize_question(self, text: str) -> str:
        """基于关键词自动分类问题"""
        if not text: 
            return "general"
        # 遍历关键词词典，找到匹配的分类
        for chinese, english in self.travel_keywords.items():
            if chinese in text: 
                return english
        return "general"  # 默认分类

# 步骤3: 聚类分析

In [104]:
# ============================================================================
# 步骤3: 聚类分析
# 负责：使用K-means等算法对相似问题进行分组
# ============================================================================

class QuestionClusterAnalyzer:
    def __init__(self, n_clusters: int = 6):
        self.n_clusters = n_clusters  # 聚类数量：视数据情况定义
        self.is_trained = False  # 模型训练状态
        # 待添加：向量化器、聚类模型等
    
    def perform_cluster_analysis(self, processed_questions: List[Dict]) -> Dict[str, Any]:
        """
        执行完整的聚类分析流程
        输入：清洗后的问题数据
        输出：聚类结果和统计分析
        """
        # 待实现的具体聚类步骤：
        # 1. 文本向量化（TF-IDF或Word2Vec）
        # 2. K-means聚类算法
        # 3. 聚类结果分析（轮廓系数等）
        # 4. 提取每个聚类的关键词
        
        cluster_results = {
            'cluster_count': self.n_clusters,#使用的聚类数量
            'questions_processed': len(processed_questions),#处理的问题总数
            'cluster_assignments': [],  # 每个问题属于哪个聚类
            'silhouette_score': 0.0,   # 聚类效果评分
            'cluster_keywords': {}     # 每个聚类的特征关键词
        }
        self.is_trained = True
        return cluster_results
    
    def assign_cluster(self, question_text: str) -> int:
        """为新问题分配所属聚类"""
        if not self.is_trained:
            return 0  # 模型未训练时返回默认聚类
        # 待实现：使用训练好的模型预测新问题的聚类
        return 0
    
    def get_cluster_keywords(self, cluster_id: int) -> List[str]:
        """获取指定聚类的特征关键词"""
        # 待实现：分析聚类中心，提取代表性词汇
        return []


# 步骤4: 问题层次和递归关系

In [105]:
# ============================================================================
# 步骤4: 问题层次和递归关系
# 负责：建立问题间的逻辑关系网络，实现个性化问卷
# ============================================================================

@dataclass
class QuestionNode:
    """
    问题节点类
    用于构建问题的层次结构（树状关系）
    """
    question_id: str           # 问题ID
    question_text: str         # 问题文本
    cluster_id: int           # 所属聚类
    complexity: str           # 复杂度等级
    category: str             # 问题类别
    parent_id: Optional[str] = None      # 父问题ID（用于构建层次）
    children: List[str] = None           # 子问题ID列表

    def __post_init__(self):
        """初始化后处理：确保children不为None"""
        if self.children is None:
            self.children = []

class QuestionHierarchyBuilder:
    """
    问题层次构建器
    主要功能：基于聚类结果构建问题的逻辑关系网络
    """
    
    def __init__(self, cluster_analyzer: QuestionClusterAnalyzer):
        self.cluster_analyzer = cluster_analyzer
        self.question_nodes = {}  # 存储所有问题节点
        self.cluster_hierarchies = {}  # 按聚类分组的问题
    
    def build_hierarchy_from_clusters(self, processed_questions: List[Dict]) -> Dict[str, Any]:
        """
        基于聚类结果构建问题层次结构
        输入：清洗和聚类后的问题数据
        输出：层次结构构建结果
        """
        # 待实现的构建策略：
        # 1. 基于聚类构建：同一聚类的问题建立关联，比如：所有关于"住宿"的问题放在一起
        # 2. 基于复杂度构建：简单→中等→复杂的递进关系，比如：先问简单的"你喜欢旅行吗？"，再问复杂的"你的旅行预算是多少？"
        # 3. 基于类别构建：相关类别问题的横向关联，比如：住宿问题和交通问题可能有逻辑关系
        
        hierarchy_results = {
            'total_nodes': len(self.question_nodes),
            'max_depth': 0,           # 层次最大深度
            'root_nodes': [],         # 根节点列表
            'leaf_nodes': []          # 叶子节点列表
        }
        return hierarchy_results
    
    def get_next_questions(self, current_question_id: str, user_answers: Dict[str, str]) -> List[str]:
        """
        根据当前问题和用户回答，推荐下一个问题
        实现个性化问卷的核心逻辑
        """
        # 待实现的推荐策略：
        # 1. 基于子节点关系，比如：回答"你喜欢旅行吗？"为"是"，就推荐子问题"你喜欢哪种旅行方式？"
        # 2. 基于同聚类其他问题，比如：用户正在回答住宿问题，推荐其他住宿相关问题
        # 3. 基于用户回答内容的分析，比如：用户说"预算有限"，就推荐经济型相关问题
        # 4. 基于问题类别关联，比如：用户回答了住宿问题，推荐相关的交通问题
        
        return []  # 返回推荐的问题ID列表
    
    def get_question_relationships(self) -> List[Dict[str, Any]]:
        """获取所有问题的关系数据（用于可视化）"""
        # 待实现：提取问题间的父子关系、聚类关系等，用于绘制关系图
        return []

# 步骤5:个性化调查问卷实现

In [106]:
# ============================================================================
# 步骤5: 个性化问卷实现
# 负责：整合前4个步骤，通过问题层次关系，实现个性化调查问卷
# ============================================================================

class TravelSurveySystem: 
    def __init__(self):
        # 待初始化：前4个步骤的组件实例
        self.crawler_bank = None
        self.preprocessor = None
        self.cluster_analyzer = None
        self.hierarchy_builder = None
        self.user_sessions = {}  # 存储用户会话数据
    
    def process_complete_workflow(self, crawled_data: List[Dict]) -> Dict[str, Any]:
        """
        执行完整的工作流程（步骤1-4）
        输入：爬虫原始数据
        输出：各步骤处理结果
        """
        # 待实现的完整流程：
        # 1. 数据存储（步骤1）
        # 2. 数据清洗（步骤2）
        # 3. 聚类分析（步骤3）
        # 4. 层次构建（步骤4）
        
        results = {
            'crawl_results': {},          # 步骤1结果
            'preprocessing_results': {},   # 步骤2结果
            'cluster_results': {},         # 步骤3结果
            'hierarchy_results': {}        # 步骤4结果
        }
        return results
    
    def start_personalized_survey(self, user_id: str, user_profile: Dict[str, Any]) -> str:
        """
        开始个性化问卷调查
        输入：用户ID和用户画像（年龄、旅行经验等）
        输出：会话ID
        """
        # 待实现：基于用户画像选择初始问题
        session_id = f"user_{user_id}"
        # 创建用户会话，记录回答历史和当前问题
        self.user_sessions[session_id] = {
            'user_id': user_id,
            'profile': user_profile,
            'answers': {},      # 已回答的问题和答案
            'current_questions': []  # 当前待回答的问题
        }
        return session_id
    
    def submit_answer(self, session_id: str, question_id: str, answer: str) -> List[str]:
        """
        提交用户答案并获取下一个问题
        实现动态问卷的核心功能
        """
        # 待实现：
        # 1. 记录用户答案
        # 2. 基于层次关系推荐下一个问题
        # 3. 更新会话状态
        
        return []  # 返回下一个问题ID列表
    
    def generate_survey_report(self, session_id: str) -> Dict[str, Any]:
        """生成个性化调查问卷"""
        # 待实现：分析用户回答，根据用户回答，生成个性化洞察和建议，同时对用户进行分类
        return {}
    
    def get_system_statistics(self) -> Dict[str, Any]:
        """获取系统运行统计数据"""
        # 待实现：问题库大小、用户数量、问卷完成率等
        return {}

# 步骤6 数据可视化

In [107]:
# ============================================================================
# 步骤6: 数据可视化
# 负责：生成各种图表和仪表板展示数据分析结果
# ============================================================================

class DataVisualization:
    """
    数据可视化类
    主要功能：生成各种图表展示系统状态和分析结果
    """
    
    def __init__(self):
        self.chart_templates = {}  # 图表模板配置
    
    def create_cluster_visualization(self, cluster_data: Dict[str, Any]) -> str:
        """
        创建聚类结果可视化图表
        显示：问题在二维空间的分布，不同聚类用不同颜色
        """
        # 待实现：使用matplotlib/seaborn创建散点图
        # 显示各个聚类的分布情况和聚类中心
        return "cluster_visualization.png"  # 返回图表文件路径
    
    def create_category_distribution_chart(self, category_data: Dict[str, int]) -> str:
        """
        创建问题类别分布图
        显示：各旅行问题类别的问题数量分布
        """
        # 待实现：创建柱状图或饼图
        # 展示住宿、交通、美食等类别的问题数量
        return "category_distribution.png"
    
    def create_complexity_analysis_chart(self, complexity_data: Dict[str, int]) -> str:
        """
        创建复杂度分析图
        显示：简单、中等、复杂问题的比例分布
        """
        # 待实现：创建堆叠柱状图或环形图
        return "complexity_analysis.png"
    
    def create_platform_comparison_chart(self, platform_data: Dict[str, int]) -> str:
        """
        创建平台数据对比图
        显示：不同旅行平台的问题数量和质量对比
        """
        # 待实现：创建多系列柱状图
        # 比较Ctrip、Booking.com等平台的数据特征
        return "platform_comparison.png"
    
    def create_survey_progress_dashboard(self, survey_data: Dict[str, Any]) -> str:
        """
        创建调查进度仪表板
        显示：问卷完成情况、用户参与度等
        """
        # 待实现：创建综合仪表板
        return "survey_dashboard.html"
    
    def create_question_relationship_graph(self, relationship_data: List[Dict]) -> str:
        """
        创建问题关系网络图
        显示：问题间的层次关系和关联网络
        """
        # 待实现：使用networkx创建网络图
        # 节点表示问题，边表示关联关系
        return "question_relationship.png"
    
    def create_user_analysis_report(self, user_data: Dict[str, Any]) -> str:
        """
        创建单个用户的个性化分析报告
        输入：用户数据，包括用户画像、回答记录、分类结果、维度得分等
        输出：可视化（如html）
        """
        # 待实现：报告内容：用户类型，分类依据（关键回答），特征雷达图（展示用户在多个维度的得分），个性化旅行建议（目的地、活动、预算等）实现详细分析（如html实现），主要包括整体用户行为分析、偏好洞察，以及单个用户输入完所有问题的反馈（用户类型及个性化建议）
        return "user_analysis_report.html"
    
    def create_allusers_analysis_report(self, system_data: Dict[str, Any]) -> str:
        """生成整体用户分析报告
           用户总数、分类情况、完成率等
        """
        return "allusers_analysis_report.html"  
    
    def export_all_visualizations(self, system_data: Dict[str, Any]) -> Dict[str, str]:
        """
        导出所有可视化图表
        输入：系统各模块的数据
        输出：各类图表文件的路径字典
        """
        visualizations = {
            'clusters': self.create_cluster_visualization(system_data.get('cluster_data', {})),
            'categories': self.create_category_distribution_chart(system_data.get('category_data', {})),
            'complexity': self.create_complexity_analysis_chart(system_data.get('complexity_data', {})),
            'platforms': self.create_platform_comparison_chart(system_data.get('platform_data', {})),
            'survey_dashboard': self.create_survey_progress_dashboard(system_data.get('survey_data', {})),
            'relationships': self.create_question_relationship_graph(system_data.get('relationship_data', [])),
            'individual_user_report': self.create_user_analysis_report(system_data.get('user_data', {})),
            'all_users_report': self.create_allusers_analysis_report(system_data.get('system_stats', {}))
        }
        return visualizations

# 步骤7 主程序库整合

In [108]:
# ============================================================================
# 主程序库整合
# 负责：整合6个步骤的所有组件，提供统一接口
# ============================================================================

class TravelSurveyLibrary:
#旅行问卷调查程序库
    
    def __init__(self):
        # 步骤1: 爬虫问卷库
        self.crawler_bank = CrawlerQuestionBank()
        
        # 步骤2: 数据集成和预处理
        self.data_preprocessor = DataIntegrationPreprocessor()
        
        # 步骤3: 聚类分析
        self.cluster_analyzer = QuestionClusterAnalyzer(n_clusters=6)
        
        # 步骤4: 问题层次和递归关系
        self.hierarchy_builder = QuestionHierarchyBuilder(self.cluster_analyzer)
        
        # 步骤5: 个性化调查问卷实现
        self.survey_system = TravelSurveySystem()
        
        # 步骤6: 数据可视化
        self.visualization = DataVisualization()