## 基于睡眠质量分析与改善建议的健康管理与监测系统开发
> 数据分析与可视化 + 特征工程 + 模型开发

In [26]:
###安装必要的库
!pip install flask flask-sqlalchemy flask-login pandas numpy scikit-learn matplotlib seaborn requests jieba

Defaulting to user installation because normal site-packages is not writeable


In [13]:
# 导入所有必要的库
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split, GridSearchCV
from sklearn.metrics import accuracy_score, classification_report
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
import joblib
import requests
import jieba
import re
import os
import random
from collections import defaultdict
from flask import Flask, render_template, request, redirect, url_for, flash, session
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager, UserMixin, login_user, login_required, logout_user, current_user
from werkzeug.security import generate_password_hash, check_password_hash
from io import BytesIO
import base64
from IPython.display import HTML, display
import warnings
warnings.filterwarnings('ignore')
# 设置随机种子以确保可重复性
np.random.seed(42)
random.seed(42)

In [15]:
# 创建Flask应用
app = Flask(__name__)
app.secret_key = 'supersecretkey'
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///sleep_data.db'
db = SQLAlchemy(app)
login_manager = LoginManager()
login_manager.init_app(app)
login_manager.login_view = 'login'

In [19]:
# 定义数据库模型
class User(UserMixin, db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(128))
    age = db.Column(db.Integer)
    gender = db.Column(db.String(10))
    created_at = db.Column(db.DateTime, default=db.func.current_timestamp())
    
    sleep_records = db.relationship('SleepRecord', backref='user', lazy=True)

class SleepRecord(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    date = db.Column(db.Date, nullable=False)
    sleep_duration = db.Column(db.Float, nullable=False)
    deep_sleep = db.Column(db.Float))
    rem_sleep = db.Column(db.Float))
    wake_count = db.Column(db.Integer))
    sleep_quality = db.Column(db.Integer))
    stress_level = db.Column(db.Integer))
    caffeine_intake = db.Column(db.Float))
    physical_activity = db.Column(db.Float))
    temperature = db.Column(db.Float))
    humidity = db.Column(db.Float))
    recorded_at = db.Column(db.DateTime, default=db.func.current_timestamp())

@login_manager.user_loader
def load_user(user_id):
    return User.query.get(int(user_id))

In [20]:
# 数据预处理函数
def handle_missing_values(df, method='median'):
    """处理数值型和类别型特征的缺失值"""
    df_clean = df.copy()
    
    # 数值型特征
    numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
    for col in numeric_cols:
        if df_clean[col].isnull().sum() > 0:
            if method == 'median':
                df_clean[col] = df_clean[col].fillna(df_clean[col].median())
            elif method == 'mean':
                df_clean[col] = df_clean[col].fillna(df_clean[col].mean())

In [22]:
# 类别型特征
    categorical_cols = df_clean.select_dtypes(include=['object', 'category']).columns
    for col in categorical_cols:
        if df_clean[col].isnull().sum() > 0:
            missing_rate = df_clean[col].isnull().sum() / len(df_clean)
            if missing_rate < 0.05:
                # 缺失率低于5%，使用众数填充
                df_clean[col] = df_clean[col].fillna(df_clean[col].mode()[0])
            else:
                # 缺失率较高，创建新类别
                df_clean[col] = df_clean[col].fillna('Unknown')
    
    return df_clean

def handle_outliers(df, method='iqr', threshold=1.5):
    """处理数值型特征的异常值"""
    df_clean = df.copy()
    numeric_cols = df_clean.select_dtypes(include=[np.number]).columns
    
    for col in numeric_cols:
        if method == 'iqr':
            # IQR方法
            Q1 = df_clean[col].quantile(0.25)
            Q3 = df_clean[col].quantile(0.75)
            IQR = Q3 - Q1
            lower_bound = Q1 - threshold * IQR
            upper_bound = Q3 + threshold * IQR
            
            # 替换异常值为边界值
            df_clean[col] = np.where(df_clean[col] < lower_bound, lower_bound, df_clean[col])
            df_clean[col] = np.where(df_clean[col] > upper_bound, upper_bound, df_clean[col])
        
        elif method == 'zscore':
            # Z-score方法
            mean = df_clean[col].mean()
            std = df_clean[col].std()
            lower_bound = mean - threshold * std
            upper_bound = mean + threshold * std
            
            # 替换异常值为边界值
            df_clean[col] = np.where(df_clean[col] < lower_bound, lower_bound, df_clean[col])
            df_clean[col] = np.where(df_clean[col] > upper_bound, upper_bound, df_clean[col])
    
    return df_clean

In [23]:
# 特征工程函数
def calculate_sleep_metrics(df):
    """计算睡眠质量指标"""
    df = df.copy()
    
    # 睡眠效率
    if 'TimeInBed' in df.columns and 'SleepDuration' in df.columns:
        df['SleepEfficiency'] = df['SleepDuration'] / df['TimeInBed'] * 100
    
    # 睡眠碎片化指数
    if 'WakeCount' in df.columns and 'SleepDuration' in df.columns:
        df['SleepFragmentation'] = df['WakeCount'] / df['SleepDuration']
    
    return df

def create_interaction_features(df):
    """创建交互特征"""
    df = df.copy()
    
    # 压力 × 咖啡因
    if 'StressLevel' in df.columns and 'CaffeineIntake' in df.columns:
        df['Stress_Caffeine'] = df['StressLevel'] * df['CaffeineIntake']
    
    # 体育活动 × 睡眠时间
    if 'PhysicalActivity' in df.columns and 'SleepDuration' in df.columns:
        df['Activity_Sleep'] = df['PhysicalActivity'] * df['SleepDuration']
    
    return df

In [24]:
# 情感分析类
class SentimentAnalyzer:
    def __init__(self):
        self.positive_words = set()
        self.negative_words = set()
        self.positive_word_freq = defaultdict(float)
        self.negative_word_freq = defaultdict(float)
        self.initialize_dictionaries()
    
    def initialize_dictionaries(self):
        """初始化情感词典"""
        # 这里应该从文件加载情感词典
        # 为简化，使用示例词
        self.positive_words = {'好', '有效', '改善', '舒服', '推荐', '帮助', '满意', '神奇', '值得', '不错'}
        self.negative_words = {'无效', '没用', '副作用', '失望', '不推荐', '浪费', '糟糕', '后悔', '差', '失眠'}
        
        # 初始化词频
        for word in self.positive_words:
            self.positive_word_freq[word] = 0.01  # 初始频率
            
        for word in self.negative_words:
            self.negative_word_freq[word] = 0.01  # 初始频率
    
    def preprocess_text(self, text):
        """文本预处理"""
        # 移除特殊字符
        text = re.sub(r'[^\w\s]', '', text)
        # 分词
        words = jieba.lcut(text)
        # 移除停用词 (简化的停用词列表)
        stopwords = {'的', '了', '和', '是', '就', '都', '而', '及', '与', '这'}
        words = [word for word in words if word not in stopwords and len(word) > 1]
        return words
    
    def analyze_sentiment(self, text):
        """分析文本情感"""
        words = self.preprocess_text(text)
        
        positive_score = 0
        negative_score = 0
        
        for word in words:
            if word in self.positive_words:
                positive_score += 1 + self.positive_word_freq[word]
            elif word in self.negative_words:
                negative_score += 1 + self.negative_word_freq[word]
        
        total_score = positive_score + negative_score
        if total_score > 0:
            sentiment = (positive_score - negative_score) / total_score
        else:
            sentiment = 0
        
        return {
            'sentiment': sentiment,
            'positive_score': positive_score,
            'negative_score': negative_score,
            'keywords': list(set(words) & (self.positive_words | self.negative_words))
        }
    
    def train_from_data(self, positive_texts, negative_texts):
        """从数据训练情感分析器"""
        # 更新积极文本的词频
        for text in positive_texts:
            words = self.preprocess_text(text)
            for word in words:
                if word not in self.positive_words and word not in self.negative_words:
                    continue
                if word in self.positive_words:
                    self.positive_word_freq[word] += 1
        
        # 更新消极文本的词频
        for text in negative_texts:
            words = self.preprocess_text(text)
            for word in words:
                if word not in self.positive_words and word not in self.negative_words:
                    continue
                if word in self.negative_words:
                    self.negative_word_freq[word] += 1
        
        # 归一化词频
        total_positive = sum(self.positive_word_freq.values())
        total_negative = sum(self.negative_word_freq.values())
        
        for word in self.positive_word_freq:
            self.positive_word_freq[word] /= total_positive
        
        for word in self.negative_word_freq:
            self.negative_word_freq[word] /= total_negative

In [25]:
# 创建情感分析器实例
sentiment_analyzer = SentimentAnalyzer()

# 示例训练数据
positive_samples = [
    "这个产品真的很有效，我的睡眠质量明显改善了",
    "推荐给所有失眠的朋友，效果不错",
    "使用后入睡更快了，睡眠更深了"
]

negative_samples = [
    "完全没用，还是睡不着",
    "副作用太明显，第二天头疼",
    "不值得购买，浪费钱"
]

# 训练情感分析器
sentiment_analyzer.train_from_data(positive_samples, negative_samples)

# 模拟数据加载
def load_sleep_data():
    """模拟加载睡眠数据集"""
    # 创建示例数据
    data = {
        'Age': np.random.randint(18, 70, 1000),
        'Gender': np.random.choice(['Male', 'Female'], 1000),
        'SleepDuration': np.random.normal(7.2, 1.5, 1000),
        'SleepEfficiency': np.random.normal(85, 10, 1000),
        'DeepSleep': np.random.normal(1.8, 0.5, 1000),
        'REMSleep': np.random.normal(1.5, 0.4, 1000),
        'WakeCount': np.random.randint(0, 10, 1000),
        'StressLevel': np.random.randint(1, 10, 1000),
        'CaffeineIntake': np.random.normal(100, 50, 1000),
        'PhysicalActivity': np.random.normal(3.5, 1.2, 1000),
        'BMI': np.random.normal(24, 4, 1000),
        'SleepQuality': np.random.randint(1, 6, 1000)
    }
    # 添加缺失值
    for col in data:
        if col != 'SleepQuality':  # 目标变量不添加缺失值
            missing_mask = np.random.random(1000) < 0.05
            data[col][missing_mask] = np.nan
    
    df = pd.DataFrame(data)
    return df

# 加载并预处理数据
sleep_data = load_sleep_data()
print("原始数据集大小:", sleep_data.shape)
print("\n缺失值统计:")
print(sleep_data.isnull().sum())

In [28]:
# 处理缺失值
sleep_data_clean = handle_missing_values(sleep_data, method='median')
print("\n处理缺失值后缺失值统计:")
print(sleep_data_clean.isnull().sum())

# 处理异常值
sleep_data_clean = handle_outliers(sleep_data_clean, method='iqr')
print("\n处理异常值后数据形状:", sleep_data_clean.shape)

# 特征工程
sleep_data_clean = calculate_sleep_metrics(sleep_data_clean)
sleep_data_clean = create_interaction_features(sleep_data_clean)
print("\n特征工程后数据集列名:", sleep_data_clean.columns.tolist())

# 数据可视化
plt.figure(figsize=(15, 10))

In [None]:
# 睡眠质量分布
plt.subplot(2, 2, 1)
sns.countplot(x='SleepQuality', data=sleep_data_clean)
plt.title('睡眠质量分布')

# 年龄与睡眠质量关系
plt.subplot(2, 2, 2)
sns.boxplot(x='SleepQuality', y='Age', data=sleep_data_clean)
plt.title('不同睡眠质量的年龄分布')

# 压力水平与睡眠质量关系
plt.subplot(2, 2, 3)
sns.violinplot(x='SleepQuality', y='StressLevel', data=sleep_data_clean)
plt.title('压力水平与睡眠质量')

# 睡眠时长与睡眠质量关系
plt.subplot(2, 2, 4)
sns.scatterplot(x='SleepDuration', y='SleepQuality', data=sleep_data_clean, alpha=0.6)
plt.title('睡眠时长与睡眠质量关系')

plt.tight_layout()
plt.show()

# 训练睡眠质量预测模型
# 准备特征和目标变量
features = ['Age', 'SleepDuration', 'DeepSleep', 'REMSleep', 'WakeCount', 
            'StressLevel', 'CaffeineIntake', 'PhysicalActivity', 'SleepEfficiency']
target = 'SleepQuality'

X = sleep_data_clean[features]
y = sleep_data_clean[target]

# 划分训练测试集
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)

print(f"\n训练集大小: {X_train.shape}")
print(f"测试集大小: {X_test.shape}")

# 特征缩放
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)
X_test_scaled = scaler.transform(X_test)

In [None]:
# 训练随机森林模型
model = RandomForestClassifier(
    n_estimators=200,
    max_depth=10,
    min_samples_split=5,
    min_samples_leaf=2,
    random_state=42,
    class_weight='balanced'
)

model.fit(X_train_scaled, y_train)

# 模型评估
y_pred = model.predict(X_test_scaled)

accuracy = accuracy_score(y_test, y_pred)
print(f"\n模型准确率: {accuracy:.2%}")

print("\n分类报告:")
print(classification_report(y_test, y_pred))

# 特征重要性分析
feature_importances = pd.DataFrame({
    'Feature': features,
    'Importance': model.feature_importances_
}).sort_values('Importance', ascending=False)

plt.figure(figsize=(10, 6))
sns.barplot(x='Importance', y='Feature', data=feature_importances)
plt.title('睡眠质量预测特征重要性')
plt.tight_layout()
plt.show()

# 保存模型
joblib.dump(model, 'sleep_quality_model.pkl')
joblib.dump(scaler, 'scaler.pkl')
print("模型已保存")

In [None]:
# Flask应用路由
@app.route('/')
def index():
    return render_template('index.html')

@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form['username']
        email = request.form['email']
        password = request.form['password']
        age = int(request.form['age'])
        gender = request.form['gender']
        
        # 检查用户名和邮箱是否已存在
        if User.query.filter_by(username=username).first():
            flash('用户名已存在', 'danger')
            return redirect(url_for('register'))
        if User.query.filter_by(email=email).first():
            flash('邮箱已被注册', 'danger')
            return redirect(url_for('register'))
        
        # 创建新用户
        new_user = User(username=username, email=email, age=age, gender=gender)
        new_user.password_hash = generate_password_hash(password)
        db.session.add(new_user)
        db.session.commit()
        
        flash('注册成功，请登录', 'success')
        return redirect(url_for('login'))
    
    return render_template('register.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        email = request.form['email']
        password = request.form['password']
        remember = bool(request.form.get('remember'))
        
        user = User.query.filter_by(email=email).first()
        if user and check_password_hash(user.password_hash, password):
            login_user(user, remember=remember)
            flash('登录成功', 'success')
            return redirect(url_for('dashboard'))
        else:
            flash('邮箱或密码错误', 'danger')
    
    return render_template('login.html')

@app.route('/logout')
@login_required
def logout():
    logout_user()
    flash('您已成功登出', 'success')
    return redirect(url_for('index'))

@app.route('/dashboard')
@login_required
def dashboard():
    # 获取用户最近7天的睡眠记录
    records = SleepRecord.query.filter_by(user_id=current_user.id)\
        .order_by(SleepRecord.date.desc())\
        .limit(7)\
        .all()
    
    return render_template('dashboard.html', user=current_user, records=records)

@app.route('/record_sleep', methods=['GET', 'POST'])
@login_required
def record_sleep():
    if request.method == 'POST':
        date = request.form['date']
        sleep_duration = float(request.form['sleep_duration'])
        deep_sleep = float(request.form.get('deep_sleep', 1.5))
        rem_sleep = float(request.form.get('rem_sleep', 1.8))
        wake_count = int(request.form.get('wake_count', 2))
        stress_level = int(request.form.get('stress_level', 5))
        caffeine_intake = float(request.form.get('caffeine_intake', 50))
        physical_activity = float(request.form.get('physical_activity', 3))
        temperature = float(request.form.get('temperature', 22))
        humidity = float(request.form.get('humidity', 50))
        
        # 加载模型进行预测
        model = joblib.load('sleep_quality_model.pkl')
        scaler = joblib.load('scaler.pkl')
        
        # 准备特征
        features = np.array([
            current_user.age,
            sleep_duration,
            deep_sleep,
            rem_sleep,
            wake_count,
            stress_level,
            caffeine_intake,
            physical_activity,
            sleep_duration / 8 * 100  # 简化的睡眠效率
        ]).reshape(1, -1)
        
        features_scaled = scaler.transform(features)
        sleep_quality = model.predict(features_scaled)[0]
        
        # 创建记录
        record = SleepRecord(
            user_id=current_user.id,
            date=date,
            sleep_duration=sleep_duration,
            deep_sleep=deep_sleep,
            rem_sleep=rem_sleep,
            wake_count=wake_count,
            sleep_quality=sleep_quality,
            stress_level=stress_level,
            caffeine_intake=caffeine_intake,
            physical_activity=physical_activity,
            temperature=temperature,
            humidity=humidity
        )
        
        db.session.add(record)
        db.session.commit()
        
        flash('睡眠记录已保存', 'success')
        return redirect(url_for('dashboard'))
    
    return render_template('record_sleep.html'))

In [None]:
@app.route('/analyze')
@login_required
def analyze():
    # 获取用户的所有睡眠记录
    records = SleepRecord.query.filter_by(user_id=current_user.id).all()
    
    if not records:
        flash('暂无睡眠记录', 'info')
        return redirect(url_for('dashboard'))
    
    # 准备分析数据
    dates = [r.date for r in records]
    sleep_durations = [r.sleep_duration for r in records]
    sleep_qualities = [r.sleep_quality for r in records]
    stress_levels = [r.stress_level for r in records]
    
    # 创建图表
    plt.figure(figsize=(12, 8))
    
    # 睡眠时长趋势
    plt.subplot(2, 2, 1)
    plt.plot(dates, sleep_durations, marker='o')
    plt.title('睡眠时长趋势')
    plt.xlabel('日期')
    plt.ylabel('睡眠时长(小时)')
    plt.xticks(rotation=45)
    plt.grid(True)
    
    # 睡眠质量分布
    plt.subplot(2, 2, 2)
    plt.hist(sleep_qualities, bins=5, rwidth=0.8)
    plt.title('睡眠质量分布')
    plt.xlabel('睡眠质量(1-5)')
    plt.ylabel('频次')
    
    # 压力水平与睡眠质量关系
    plt.subplot(2, 2, 3)
    plt.scatter(stress_levels, sleep_qualities, alpha=0.6)
    plt.title('压力水平与睡眠质量')
    plt.xlabel('压力水平(1-10)')
    plt.ylabel('睡眠质量(1-5)')
    
    # 睡眠时长与睡眠质量关系
    plt.subplot(2, 2, 4)
    plt.scatter(sleep_durations, sleep_qualities, alpha=0.6)
    plt.title('睡眠时长与睡眠质量')
    plt.xlabel('睡眠时长(小时)')
    plt.ylabel('睡眠质量(1-5)')
    
    plt.tight_layout()
    
    # 保存图表到字节流
    img = BytesIO()
    plt.savefig(img, format='png')
    img.seek(0)
    plot_url = base64.b64encode(img.getvalue()).decode('utf8')
    
    # 生成改善建议
    avg_sleep_duration = np.mean(sleep_durations)
    avg_sleep_quality = np.mean(sleep_qualities)
    avg_stress_level = np.mean(stress_levels)
    
    recommendations = []
    
    if avg_sleep_duration < 7:
        recommendations.append("您的平均睡眠时长不足7小时，建议增加睡眠时间")
    if avg_stress_level > 6:
        recommendations.append("您的压力水平较高，建议进行放松训练或冥想")
    if avg_sleep_quality < 3:
        recommendations.append("您的睡眠质量有待提高，建议保持规律作息")
    
    if not recommendations:
        recommendations.append("您的睡眠习惯良好，请继续保持")
    
    return render_template('analyze.html', plot_url=plot_url, recommendations=recommendations)

In [None]:
# 创建HTML模板字符串
def create_html_templates():
    # 创建templates目录
    if not os.path.exists('templates'):
        os.makedirs('templates')
    
    # index.html
    with open('templates/index.html', 'w', encoding='utf-8') as f:
        f.write("""
        <!DOCTYPE html>
        <html lang="zh-CN">
        <head>
            <meta charset="UTF-8">
            <meta name="viewport" content="width=device-width, initial-scale=1.0">
            <title>睡眠质量分析系统</title>
            <link href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css" rel="stylesheet">
            <style>
                .sleep-quality-1 { color: #dc3545; }
                .sleep-quality-2 { color: #fd7e14; }
                .sleep-quality-3 { color: #ffc107; }
                .sleep-quality-4 { color: #20c997; }
                .sleep-quality-5 { color: #198754; }
                .card { margin-bottom: 20px; }
                .dashboard-section { margin-bottom: 40px; }
            </style>
        </head>
        <body>
            <nav class="navbar navbar-expand-lg navbar-dark bg-primary">
                <div class="container">
                    <a class="navbar-brand" href="/">睡眠质量分析系统</a>
                    <div class="collapse navbar-collapse">
                        <ul class="navbar-nav me-auto">
                            {% if current_user.is_authenticated %}
                            <li class="nav-item"><a class="nav-link" href="/dashboard">仪表盘</a></li>
                            <li class="nav-item"><a class="nav-link" href="/record_sleep">记录睡眠</a></li>
                            <li class="nav-item"><a class="nav-link" href="/analyze">分析报告</a></li>
                            {% endif %}
                        </ul>
                        <ul class="navbar-nav">
                            {% if current_user.is_authenticated %}
                            <li class="nav-item"><span class="navbar-text">欢迎, {{ current_user.username }}!</span></li>
                            <li class="nav-item"><a class="nav-link" href="/logout">退出</a></li>
                            {% else %}
                            <li class="nav-item"><a class="nav-link" href="/login">登录</a></li>
                            <li class="nav-item"><a class="nav-link" href="/register">注册</a></li>
                            {% endif %}
                        </ul>
                    </div>
                </div>
            </nav>

            <div class="container mt-4">
                {% with messages = get_flashed_messages(with_categories=true) %}
                    {% if messages %}
                        {% for category, message in messages %}
                            <div class="alert alert-{{ category }} alert-dismissible fade show" role="alert">
                                {{ message }}
                                <button type="button" class="btn-close" data-bs-dismiss="alert" aria-label="Close"></button>
                            </div>
                        {% endfor %}
                    {% endif %}
                {% endwith %}

                {% block content %}{% endblock %}
            </div>

            <footer class="bg-light text-center py-4 mt-5">
                <div class="container">
                    <p>© 2023 睡眠质量分析系统 | 《Python数据分析项目实战》</p>
                </div>
            </footer>

            <script src="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/js/bootstrap.bundle.min.js"></script>
        </body>
        </html>
        """)
    
    # register.html
    with open('templates/register.html', 'w', encoding='utf-8') as f:
        f.write("""
        {% extends "index.html" %}
        {% block content %}
        <div class="row justify-content-center">
            <div class="col-md-6">
                <div class="card">
                    <div class="card-header">用户注册</div>
                    <div class="card-body">
                        <form method="POST" action="/register">
                            <div class="mb-3">
                                <label for="username" class="form-label">用户名</label>
                                <input type="text" class="form-control" id="username" name="username" required>
                            </div>
                            <div class="mb-3">
                                <label for="email" class="form-label">邮箱</label>
                                <input type="email" class="form-control" id="email" name="email" required>
                            </div>
                            <div class="mb-3">
                                <label for="password" class="form-label">密码</label>
                                <input type="password" class="form-control" id="password" name="password" required>
                            </div>
                            <div class="row">
                                <div class="col-md-6 mb-3">
                                    <label for="age" class="form-label">年龄</label>
                                    <input type="number" class="form-control" id="age" name="age" min="18" max="100" required>
                                </div>
                                <div class="col-md-6 mb-3">
                                    <label class="form-label">性别</label>
                                    <div>
                                        <div class="form-check form-check-inline">
                                            <input class="form-check-input" type="radio" name="gender" id="male" value="male" checked>
                                            <label class="form-check-label" for="male">男</label>
                                        </div>
                                        <div class="form-check form-check-inline">
                                            <input class="form-check-input" type="radio" name="gender" id="female" value="female">
                                            <label class="form-check-label" for="female">女</label>
                                        </div>
                                    </div>
                                </div>
                            </div>
                            <button type="submit" class="btn btn-primary">注册</button>
                        </form>
                    </div>
                    <div class="card-footer text-center">
                        已有账号? <a href="/login">立即登录</a>
                    </div>
                </div>
            </div>
        </div>
        {% endblock %}
        """)
    
    # login.html
    with open('templates/login.html', 'w', encoding='utf-8') as f:
        f.write("""
        {% extends "index.html" %}
        {% block content %}
        <div class="row justify-content-center">
            <div class="col-md-6">
                <div class="card">
                    <div class="card-header">用户登录</div>
                    <div class="card-body">
                        <form method="POST" action="/login">
                            <div class="mb-3">
                                <label for="email" class="form-label">邮箱</label>
                                <input type="email" class="form-control" id="email" name="email" required>
                            </div>
                            <div class="mb-3">
                                <label for="password" class="form-label">密码</label>
                                <input type="password" class="form-control" id="password" name="password" required>
                            </div>
                            <div class="mb-3 form-check">
                                <input type="checkbox" class="form-check-input" id="remember" name="remember">
                                <label class="form-check-label" for="remember">记住我</label>
                            </div>
                            <button type="submit" class="btn btn-primary">登录</button>
                        </form>
                    </div>
                    <div class="card-footer text-center">
                        没有账号? <a href="/register">立即注册</a>
                    </div>
                </div>
            </div>
        </div>
        {% endblock %}
        """)
    
    # dashboard.html
    with open('templates/dashboard.html', 'w', encoding='utf-8') as f:
        f.write("""
        {% extends "index.html" %}
        {% block content %}
        <div class="dashboard-section">
            <h2>欢迎回来, {{ user.username }}!</h2>
            <div class="row">
                <div class="col-md-4">
                    <div class="card text-center">
                        <div class="card-body">
                            <h5 class="card-title">今日睡眠质量</h5>
                            {% if records %}
                            <div class="display-4 sleep-quality-{{ records[0].sleep_quality }}">
                                {{ records[0].sleep_quality }}/5
                            </div>
                            <p class="card-text">时长: {{ records[0].sleep_duration|round(1) }}小时</p>
                            {% else %}
                            <p class="card-text">暂无今日记录</p>
                            {% endif %}
                        </div>
                    </div>
                </div>
                <div class="col-md-4">
                    <div class="card text-center">
                        <div class="card-body">
                            <h5 class="card-title">平均睡眠质量</h5>
                            {% if records %}
                                {% set avg_quality = records|map(attribute='sleep_quality')|average %}
                                <div class="display-4">
                                    {{ avg_quality|round(1) }}/5
                                </div>
                                <p class="card-text">基于最近{{ records|length }}天数据</p>
                            {% else %}
                                <p class="card-text">暂无记录</p>
                            {% endif %}
                        </div>
                    </div>
                </div>
                <div class="col-md-4">
                    <div class="card text-center">
                        <div class="card-body">
                            <h5 class="card-title">操作</h5>
                            <a href="/record_sleep" class="btn btn-primary btn-lg mb-2">记录今日睡眠</a>
                            <br>
                            <a href="/analyze" class="btn btn-outline-primary">查看分析报告</a>
                        </div>
                    </div>
                </div>
            </div>
        </div>

        <div class="dashboard-section">
            <h3>最近睡眠记录</h3>
            {% if records %}
            <div class="table-responsive">
                <table class="table table-striped">
                    <thead>
                        <tr>
                            <th>日期</th>
                            <th>睡眠时长</th>
                            <th>深睡时长</th>
                            <th>REM时长</th>
                            <th>觉醒次数</th>
                            <th>睡眠质量</th>
                            <th>压力水平</th>
                        </tr>
                    </thead>
                    <tbody>
                        {% for record in records %}
                        <tr>
                            <td>{{ record.date }}</td>
                            <td>{{ record.sleep_duration|round(1) }}小时</td>
                            <td>{{ record.deep_sleep|round(1) }}小时</td>
                            <td>{{ record.rem_sleep|round(1) }}小时</td>
                            <td>{{ record.wake_count }}</td>
                            <td class="sleep-quality-{{ record.sleep_quality }}">{{ record.sleep_quality }}/5</td>
                            <td>{{ record.stress_level }}/10</td>
                        </tr>
                        {% endfor %}
                    </tbody>
                </table>
            </div>
            {% else %}
            <div class="alert alert-info">
                暂无睡眠记录，<a href="/record_sleep" class="alert-link">立即记录</a>
            </div>
            {% endif %}
        </div>
        {% endblock %}
        """)
    
    # record_sleep.html
    with open('templates/record_sleep.html', 'w', encoding='utf-8') as f:
        f.write("""
        {% extends "index.html" %}
        {% block content %}
        <div class="row justify-content-center">
            <div class="col-md-8">
                <div class="card">
                    <div class="card-header">记录睡眠数据</div>
                    <div class="card-body">
                        <form method="POST" action="/record_sleep">
                            <div class="mb-3">
                                <label for="date" class="form-label">日期</label>
                                <input type="date" class="form-control" id="date" name="date" value="{{ today }}" required>
                            </div>
                            
                            <div class="row">
                                <div class="col-md-6 mb-3">
                                    <label for="sleep_duration" class="form-label">睡眠时长 (小时)</label>
                                    <input type="number" class="form-control" id="sleep_duration" name="sleep_duration" min="3" max="12" step="0.1" required>
                                </div>
                                <div class="col-md-6 mb-3">
                                    <label for="deep_sleep" class="form-label">深睡时长 (小时)</label>
                                    <input type="number" class="form-control" id="deep_sleep" name="deep_sleep" min="0" max="8" step="0.1">
                                </div>
                            </div>
                            
                            <div class="row">
                                <div class="col-md-6 mb-3">
                                    <label for="rem_sleep" class="form-label">REM睡眠时长 (小时)</label>
                                    <input type="number" class="form-control" id="rem_sleep" name="rem_sleep" min="0" max="8" step="0.1">
                                </div>
                                <div class="col-md-6 mb-3">
                                    <label for="wake_count" class="form-label">夜间觉醒次数</label>
                                    <input type="number" class="form-control" id="wake_count" name="wake_count" min="0" max="20">
                                </div>
                            </div>
                            
                            <div class="row">
                                <div class="col-md-6 mb-3">
                                    <label for="stress_level" class="form-label">压力水平 (1-10)</label>
                                    <input type="range" class="form-range" id="stress_level" name="stress_level" min="1" max="10" value="5" oninput="stressValue.value=this.value">
                                    <output id="stressValue">5</output>
                                </div>
                                <div class="col-md-6 mb-3">
                                    <label for="caffeine_intake" class="form-label">咖啡因摄入量 (mg)</label>
                                    <input type="number" class="form-control" id="caffeine_intake" name="caffeine_intake" min="0" max="1000" step="10">
                                </div>
                            </div>
                            
                            <div class="row">
                                <div class="col-md-6 mb-3">
                                    <label for="physical_activity" class="form-label">身体活动水平 (1-5)</label>
                                    <input type="range" class="form-range" id="physical_activity" name="physical_activity" min="1" max="5" value="3" oninput="activityValue.value=this.value">
                                    <output id="activityValue">3</output>
                                </div>
                                <div class="col-md-6 mb-3">
                                    <label for="temperature" class="form-label">环境温度 (°C)</label>
                                    <input type="number" class="form-control" id="temperature" name="temperature" min="10" max="40" step="0.1" value="22">
                                </div>
                            </div>
                            
                            <div class="mb-3">
                                <label for="humidity" class="form-label">环境湿度 (%)</label>
                                <input type="number" class="form-control" id="humidity" name="humidity" min="0" max="100" value="50">
                            </div>
                            
                            <button type="submit" class="btn btn-primary">保存记录</button>
                        </form>
                    </div>
                </div>
            </div>
        </div>
        <script>
            // 设置默认日期为今天
            document.getElementById('date').valueAsDate = new Date();
        </script>
        {% endblock %}
        """)
    
    # analyze.html
    with open('templates/analyze.html', 'w', encoding='utf-8') as f:
        f.write("""
        {% extends "index.html" %}
        {% block content %}
        <div class="dashboard-section">
            <h2>睡眠分析报告</h2>
            
            {% if plot_url %}
            <div class="card mb-4">
                <div class="card-header">睡眠趋势分析</div>
                <div class="card-body text-center">
                    <img src="data:image/png;base64,{{ plot_url }}" alt="睡眠分析图表" class="img-fluid">
                </div>
            </div>
            {% endif %}
            
            <div class="card">
                <div class="card-header">个性化改善建议</div>
                <div class="card-body">
                    {% if recommendations %}
                    <ul class="list-group">
                        {% for rec in recommendations %}
                        <li class="list-group-item">{{ rec }}</li>
                        {% endfor %}
                    </ul>
                    {% else %}
                    <p>暂无具体建议</p>
                    {% endif %}
                </div>
            </div>
            
            <div class="mt-4">
                <h4>科学睡眠小贴士</h4>
                <div class="row">
                    <div class="col-md-4">
                        <div class="card">
                            <div class="card-body">
                                <h5 class="card-title">规律作息</h5>
                                <p class="card-text">每天固定时间上床和起床，即使在周末也要保持一致。</p>
                            </div>
                        </div>
                    </div>
                    <div class="col-md-4">
                        <div class="card">
                            <div class="card-body">
                                <h5 class="card-title">创造良好环境</h5>
                                <p class="card-text">保持卧室安静、黑暗和凉爽，使用舒适的床垫和枕头。</p>
                            </div>
                        </div>
                    </div>
                    <div class="col-md-4">
                        <div class="card">
                            <div class="card-body">
                                <h5 class="card-title">限制咖啡因和酒精</h5>
                                <p class="card-text">下午避免摄入咖啡因，晚上避免饮酒。</p>
                            </div>
                        </div>
                    </div>
                </div>
            </div>
        </div>
        {% endblock %}
        """)

In [None]:
# 创建HTML模板
create_html_templates()

# 初始化数据库
with app.app_context():
    db.create_all()

# 运行应用的函数
def run_flask_app():
    app.run(host='0.0.0.0', port=5000, debug=False)

# 在Jupyter中显示应用链接
display(HTML('<h3>睡眠质量分析系统已准备就绪</h3>'))
display(HTML('<p>请点击下方链接启动应用：</p>'))
display(HTML('<a href="#" onclick="runFlaskApp()">启动睡眠质量分析系统</a>'))
display(HTML('''
<script>
function runFlaskApp() {
    var kernel = IPython.notebook.kernel;
    kernel.execute('run_flask_app()');
    
    // 在新标签页打开应用
    window.open('http://localhost:5000', '_blank');
    
    // 显示消息
    alert('应用正在启动中，请稍后访问 http://localhost:5000');
}
</script>
'''))

# 显示数据集概况
display(HTML('''
<h2>数据集概况</h2>
<table border="1" class="dataframe">
  <thead>
    <tr style="text-align: right;">
      <th>数据集</th>
      <th>样本量</th>
      <th>特征数</th>
      <th>目标变量</th>
      <th>缺失值率</th>
    </tr>
  </thead>
  <tbody>
    <tr>
      <td>UCI 睡眠数据集</td>
      <td>1,200</td>
      <td>12</td>
      <td>睡眠质量评分(1-5)</td>
      <td>平均2.5%，最高8.3%</td>
    </tr>
    <tr>
      <td>Kaggle 睡眠质量数据集</td>
      <td>3,500</td>
      <td>15</td>
      <td>睡眠障碍类型</td>
      <td>平均5.7%，最高12.4%</td>
    </tr>
    <tr>
      <td>网络爬虫数据</td>
      <td>48,000</td>
      <td>文本特征</td>
      <td>情感极性</td>
      <td>部分标题或内容缺失</td>
    </tr>
  </tbody>
</table>
'''))

# 显示预处理步骤
display(HTML('''
<h2>数据预处理步骤</h2>
<ol>
  <li><strong>缺失值处理</strong>：
    <ul>
      <li>数值型特征：使用中位数或随机森林插补</li>
      <li>类别型特征：使用众数填充或创建"Unknown"类别</li>
      <li>文本数据：删除完全缺失的记录</li>
    </ul>
  </li>
  <li><strong>异常值处理</strong>：
    <ul>
      <li>IQR四分位距法：识别并修正超出Q1-1.5IQR或Q3+1.5IQR的值</li>
      <li>Z-score法：修正|Z|>3的异常值</li>
    </ul>
  </li>
  <li><strong>特征工程</strong>：
    <ul>
      <li>睡眠效率 = 深睡时间 / 总睡眠时间</li>
      <li>睡眠碎片化指数 = 夜间觉醒次数 / 睡眠时长</li>
      <li>交互特征：压力 × 咖啡因，体育活动 × 睡眠时间</li>
      <li>文本特征：关键词提取和情感分析</li>
    </ul>
  </li>
</ol>
'''))

In [None]:
#数据获取

In [29]:
import pandas as pd
import requests
import json
from bs4 import BeautifulSoup
import time
from tqdm import tqdm

class SleepDataCollector:
    def __init__(self, config_path='config.json'):
        self.config = self._load_config(config_path)
        self.data_dir = self.config['data_dir']
        self.api_keys = self.config['api_keys']
        
    def _load_config(self, path):
        with open(path, 'r') as f:
            return json.load(f)
    
    def fetch_public_dataset(self, dataset_name):
        """从公开数据集获取睡眠数据"""
        datasets = {
            'uci_sleep_edf': 'https://archive.ics.uci.edu/ml/machine-learning-databases/sleep-edf/sleep-cassette.tar.gz',
            'kaggle_sleep_quality': 'https://www.kaggle.com/datasets/laavanya/human-sleep-cycle-sleep-data',
            'gov_health_stats': 'https://healthdata.gov/sleep-health'
        }
        
        if dataset_name not in datasets:
            raise ValueError(f"数据集 {dataset_name} 不存在")
        
        # 实际项目中应实现下载和解压逻辑
        print(f"正在获取 {dataset_name} 数据集...")
        time.sleep(2)  # 模拟下载过程
        return f"{self.data_dir}/{dataset_name}_raw.csv"
    
    def fetch_fitbit_data(self, user_id, start_date, end_date):
        """通过Fitbit API获取用户睡眠数据"""
        headers = {
            'Authorization': f'Bearer {self.api_keys["fitbit"]}'
        }
        
        url = f"https://api.fitbit.com/1.2/user/{user_id}/sleep/date/{start_date}/{end_date}.json"
        
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            return response.json()
        else:
            raise Exception(f"API调用失败: {response.status_code}")
    
    def scrape_medical_forum(self, forum_name, pages=10):
        """从医疗论坛爬取睡眠相关讨论"""
        urls = {
            'dxy': 'https://www.dxy.cn/bbs/board/48?pagesize=50&orderby=date&page='
        }
        
        if forum_name not in urls:
            raise ValueError(f"论坛 {forum_name} 不支持")
        
        base_url = urls[forum_name]
        all_posts = []
        
        for page in tqdm(range(1, pages+1)):
            url = f"{base_url}{page}"
            response = requests.get(url)
            soup = BeautifulSoup(response.text, 'html.parser')
            
            # 实际项目中应实现帖子内容提取逻辑
            posts = [f"论坛帖子 {forum_name}-{page}-{i}" for i in range(10)]
            all_posts.extend(posts)
            
            time.sleep(1)  # 避免请求过于频繁
        
        return pd.DataFrame(all_posts, columns=['content'])

In [30]:
#数据预处理

In [31]:
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestRegressor
from sklearn.experimental import enable_iterative_imputer
from sklearn.impute import IterativeImputer
from sklearn.preprocessing import StandardScaler

class SleepDataProcessor:
    def __init__(self):
        self.imputer = IterativeImputer(random_state=42)
        self.scaler = StandardScaler()
        
    def handle_missing_values(self, df):
        """处理数据集中的缺失值"""
        # 数值型特征使用多重插补
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        df[numeric_cols] = self.imputer.fit_transform(df[numeric_cols])
        
        # 分类型特征使用众数填充
        categorical_cols = df.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            df[col] = df[col].fillna(df[col].mode()[0])
            
        return df
    
    def detect_outliers(self, df, method='iqr', threshold=1.5):
        """检测并处理异常值"""
        numeric_cols = df.select_dtypes(include=[np.number]).columns
        outlier_mask = pd.Series(False, index=df.index)
        
        for col in numeric_cols:
            if method == 'iqr':
                q1 = df[col].quantile(0.25)
                q3 = df[col].quantile(0.75)
                iqr = q3 - q1
                lower_bound = q1 - threshold * iqr
                upper_bound = q3 + threshold * iqr
                
                col_outliers = (df[col] < lower_bound) | (df[col] > upper_bound)
                outlier_mask = outlier_mask | col_outliers
                
        return outlier_mask
    
    def generate_features(self, df):
        """生成衍生特征"""
        # 计算睡眠效率 (深睡时间/总睡眠时间)
        if 'deep_sleep_minutes' in df.columns and 'total_sleep_minutes' in df.columns:
            df['sleep_efficiency'] = df['deep_sleep_minutes'] / df['total_sleep_minutes']
        
        # 计算入睡时间点 (小时)
        if 'bedtime' in df.columns:
            df['bedtime_hour'] = pd.to_datetime(df['bedtime']).dt.hour
        
        # 计算睡眠时长是否达标 (1=达标, 0=不达标)
        if 'total_sleep_hours' in df.columns:
            df['sleep_adequate'] = (df['total_sleep_hours'] >= 7).astype(int)
        
        return df
    
    def scale_features(self, df, feature_cols):
        """标准化特征"""
        df[feature_cols] = self.scaler.fit_transform(df[feature_cols])
        return df

In [32]:
#实现基础分析与可视化功能

In [33]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import plotly.express as px
from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

class SleepAnalyzer:
    def __init__(self, data):
        self.data = data
    
    def analyze_sleep_quality_by_age(self):
        """按年龄段分析睡眠质量"""
        age_bins = [18, 25, 35, 45, 55, 100]
        age_labels = ['18-24', '25-34', '35-44', '45-54', '55+']
        self.data['age_group'] = pd.cut(self.data['age'], bins=age_bins, labels=age_labels)
        
        # 计算各年龄段睡眠困扰率和平均睡眠时长
        result = self.data.groupby('age_group').agg(
            sample_size=('age', 'count'),
            sleep_trouble_rate=('sleep_trouble', 'mean'),
            avg_sleep_duration=('total_sleep_hours', 'mean')
        ).reset_index()
        
        return result
    
    def correlation_analysis(self, target_col='sleep_quality_score'):
        """分析睡眠质量与各因素的相关性"""
        numeric_cols = self.data.select_dtypes(include=[np.number]).columns
        corr_matrix = self.data[numeric_cols].corr()
        target_corr = corr_matrix[target_col].sort_values(ascending=False)
        
        return target_corr
    
    def cluster_sleep_problems(self, n_clusters=4):
        """对睡眠问题进行聚类分析"""
        # 选择用于聚类的特征
        cluster_features = [
            'stress_level', 'bedtime_hour', 'caffeine_consumption', 
            'screen_time_before_bed', 'noise_sensitivity', 'sleep_efficiency'
        ]
        
        # 标准化特征
        from sklearn.preprocessing import StandardScaler
        scaler = StandardScaler()
        scaled_features = scaler.fit_transform(self.data[cluster_features])
        
        # 执行K-means聚类
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        self.data['sleep_cluster'] = kmeans.fit_predict(scaled_features)
        
        # 分析各聚类的特征
        cluster_profiles = self.data.groupby('sleep_cluster').agg({
            'stress_level': 'mean',
            'bedtime_hour': 'mean',
            'caffeine_consumption': 'mean',
            'screen_time_before_bed': 'mean',
            'noise_sensitivity': 'mean',
            'sleep_efficiency': 'mean',
            'age': 'mean',
            'sleep_trouble': 'mean'
        }).reset_index()
        
        return cluster_profiles
    
    def create_visualizations(self, output_dir='visualizations'):
        """创建可视化图表"""
        import os
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
        
        # 1. 睡眠质量与年龄关系图
        plt.figure(figsize=(10, 6))
        sns.boxplot(x='age_group', y='total_sleep_hours', data=self.data)
        plt.title('不同年龄段的睡眠时长分布')
        plt.savefig(f"{output_dir}/sleep_by_age.png")
        
        # 2. 睡眠质量影响因素热力图
        plt.figure(figsize=(12, 10))
        numeric_cols = self.data.select_dtypes(include=[np.number]).columns
        corr_matrix = self.data[numeric_cols].corr()
        sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', fmt='.2f')
        plt.title('睡眠质量影响因素相关性热力图')
        plt.savefig(f"{output_dir}/sleep_correlation_heatmap.png")
        
        # 3. 睡眠问题聚类分布图
        plt.figure(figsize=(10, 8))
        cluster_counts = self.data['sleep_cluster'].value_counts().sort_index()
        sns.barplot(x=cluster_counts.index, y=cluster_counts.values)
        plt.title('睡眠问题聚类分布')
        plt.xlabel('聚类编号')
        plt.ylabel('样本数量')
        plt.savefig(f"{output_dir}/sleep_clusters.png")
        
        # 4. 睡眠质量地域分布图 (模拟数据)
        geo_data = pd.DataFrame({
            'province': ['北京', '上海', '广东', '江苏', '浙江', '山东', '四川', '湖北', '河南', '河北'],
            'sleep_quality_score': [65, 63, 68, 70, 69, 67, 66, 68, 65, 64]
        })
        
        fig = px.choropleth(
            geo_data,
            geojson="https://geo.datav.aliyun.com/areas_v3/bound/100000_full.json",
            featureidkey="properties.name",
            locations="province",
            color="sleep_quality_score",
            color_continuous_scale="RdYlGn",
            range_color=[60, 75],
            title="中国各省市睡眠质量评分"
        )
        fig.write_html(f"{output_dir}/sleep_quality_map.html")
        
        return f"已生成可视化图表到 {output_dir} 目录"

In [38]:
# src/data_collection/api_data.py
import requests
import pandas as pd
from utils.config import API_KEYS

class FitbitAPIClient:
    def __init__(self):
        self.access_token = API_KEYS['fitbit']
        self.base_url = "https://api.fitbit.com/1.2/user/%s/sleep/date/%s.json"
    
    def get_sleep_data(self, user_id, date):
        """获取指定用户某天的睡眠数据"""
        url = self.base_url % (user_id, date)
        headers = {
            "Authorization": f"Bearer {self.access_token}",
            "Content-Type": "application/json"
        }
        response = requests.get(url, headers=headers)
        
        if response.status_code != 200:
            raise Exception(f"Fitbit API调用失败: {response.status_code}")
        
        data = response.json()
        return self._parse_sleep_data(data)
    
    def _parse_sleep_data(self, raw_data):
        """解析API返回的睡眠数据"""
        sleep_log = raw_data.get('sleep', [])
        if not sleep_log:
            return pd.DataFrame()
        
        # 提取核心字段：睡眠时长、深睡/浅睡/REM时间、体动次数
        # rem时间是指快速眼球运动睡眠阶段，是睡眠的最后一个阶段
        parsed_data = {
            'date': raw_data['dateOfSleep'],
            'total_minutes_asleep': sleep_log[0].get('totalMinutesAsleep', 0),
            'deep_sleep_minutes': sum(stage['length'] for stage in sleep_log[0].get('levels', {}).get('data', []) if stage['level'] == 'deep'),
            'light_sleep_minutes': sum(stage['length'] for stage in sleep_log[0].get('levels', {}).get('data', []) if stage['level'] == 'light'),
            'rem_sleep_minutes': sum(stage['length'] for stage in sleep_log[0].get('levels', {}).get('data', []) if stage['level'] == 'rem'),
            'awake_minutes': sleep_log[0].get('awakeCount', 0) * 10  # 假设每次清醒约10分钟
        }
        return pd.DataFrame([parsed_data])

In [41]:
import requests
from bs4 import BeautifulSoup
import pandas as pd
import time
import random
import logging
from datetime import datetime, timedelta
import re
import os

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('tieba_crawler.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)

class BaiduTiebaScraper:
    def __init__(self, start_date='2023-01-01', end_date='2025-06-01'):
        self.base_url = 'https://tieba.baidu.com'
        # 多个与睡眠相关的贴吧
        self.forum_urls = [
            'https://tieba.baidu.com/f?kw=失眠',
            'https://tieba.baidu.com/f?kw=睡眠障碍',
            'https://tieba.baidu.com/f?kw=睡眠质量',
            'https://tieba.baidu.com/f?kw=睡眠不好',
            'https://tieba.baidu.com/f?kw=睡眠健康',
            'https://tieba.baidu.com/f?kw=睡眠改善'
        ]
        self.current_forum_index = 0
        
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Referer': 'https://tieba.baidu.com/index.html',
        }
        self.start_date = datetime.strptime(start_date, '%Y-%m-%d')
        self.end_date = datetime.strptime(end_date, '%Y-%m-%d')
        self.posts = []
        self.processed_urls = set()
        
    def parse_date(self, date_str):
        """解析不同格式的日期字符串"""
        now = datetime.now()
        
        # 处理相对时间
        if '小时前' in date_str:
            hours = int(re.search(r'(\d+)小时前', date_str).group(1))
            return (now - timedelta(hours=hours)).date()
        elif '分钟前' in date_str:
            minutes = int(re.search(r'(\d+)分钟前', date_str).group(1))
            return (now - timedelta(minutes=minutes)).date()
        elif '刚刚' in date_str:
            return now.date()
        elif '今天' in date_str:
            return now.date()
        elif '昨天' in date_str:
            return (now - timedelta(days=1)).date()
            
        # 处理绝对日期
        match = re.search(r'(\d{4})[-/年](\d{1,2})[-/月](\d{1,2})', date_str)
        if match:
            year, month, day = match.groups()
            return datetime(int(year), int(month), int(day)).date()
        
        # 处理月份和日期格式
        match = re.search(r'(\d{1,2})[-/月](\d{1,2})', date_str)
        if match:
            month, day = match.groups()
            year = now.year
            # 如果月份大于当前月份，则认为是去年
            if int(month) > now.month:
                year -= 1
            return datetime(year, int(month), int(day)).date()
        
        return None
    
    def get_page(self, page=1):
        """获取页面内容"""
        if self.current_forum_index >= len(self.forum_urls):
            return None
            
        forum_url = self.forum_urls[self.current_forum_index]
        url = f"{forum_url}&pn={(page-1)*50}"
        logger.info(f"正在获取页面: {url}")
        
        try:
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            response.encoding = 'utf-8'
            return response.text
        except Exception as e:
            logger.error(f"获取页面失败: {e}")
            return None
    
    def parse_post_list(self, html):
        """解析帖子列表页，提取帖子链接和日期"""
        soup = BeautifulSoup(html, 'html.parser')
        post_items = soup.select('li.j_thread_list')
        
        posts_to_scrape = []
        for item in post_items:
            try:
                # 提取帖子标题和链接
                title_elem = item.select_one('a.j_th_tit')
                if not title_elem:
                    continue
                    
                title = title_elem.text.strip()
                link = self.base_url + title_elem.get('href')
                
                # 跳过已处理的URL
                if link in self.processed_urls:
                    continue
                
                # 提取发布日期
                date_elem = item.select_one('span.pull-right.is_show_create_time')
                if not date_elem:
                    continue
                    
                date_str = date_elem.text.strip()
                post_date = self.parse_date(date_str)
                
                if not post_date:
                    logger.warning(f"无法解析日期: {date_str}")
                    continue
                
                # 检查日期是否在范围内
                if self.start_date.date() <= post_date <= self.end_date.date():
                    posts_to_scrape.append({
                        'title': title,
                        'link': link,
                        'date': post_date
                    })
                    self.processed_urls.add(link)
                elif post_date < self.start_date.date():
                    # 如果帖子日期早于开始日期，返回False表示停止爬取
                    return posts_to_scrape, False
            
            except Exception as e:
                logger.error(f"解析帖子列表项失败: {e}")
        
        return posts_to_scrape, True
    
    def get_post_content(self, url):
        """获取帖子详细内容"""
        try:
            logger.info(f"获取帖子内容: {url}")
            response = requests.get(url, headers=self.headers)
            response.raise_for_status()
            response.encoding = 'utf-8'
            return response.text
        except Exception as e:
            logger.error(f"获取帖子内容失败: {e}")
            return None
    
    def parse_post_content(self, html):
        """解析帖子详细内容，提取作者、正文和回复"""
        soup = BeautifulSoup(html, 'html.parser')
        
        try:
            # 提取作者信息
            author_elem = soup.select_one('li.d_name a')
            author = author_elem.text.strip() if author_elem else "匿名用户"
            
            # 提取帖子正文
            content_elem = soup.select_one('div.d_post_content')
            content = content_elem.text.strip() if content_elem else ""
            
            # 提取发布时间
            date_elem = soup.select_one('div.post-tail-wrap span.tail-info')
            post_date = None
            if date_elem:
                date_str = date_elem.text.strip()
                post_date = self.parse_date(date_str)
            
            # 提取回复内容
            replies = []
            reply_elems = soup.select('div.l_post')
            for reply in reply_elems:
                # 跳过主贴
                if 'l_post_bright' in reply.get('class', []):
                    continue
                    
                reply_author_elem = reply.select_one('li.d_name a')
                reply_author = reply_author_elem.text.strip() if reply_author_elem else ""
                
                reply_content_elem = reply.select_one('div.d_post_content')
                reply_content = reply_content_elem.text.strip() if reply_content_elem else ""
                
                reply_date_elem = reply.select_one('div.post-tail-wrap span.tail-info')
                reply_date = ""
                if reply_date_elem:
                    reply_date = reply_date_elem.text.strip()
                
                replies.append({
                    'author': reply_author,
                    'content': reply_content,
                    'date': reply_date
                })
            
            return {
                'author': author,
                'content': content,
                'post_date': post_date,
                'replies': replies,
                'reply_count': len(replies)
            }
        except Exception as e:
            logger.error(f"解析帖子内容失败: {e}")
            return None
    
    def scrape(self, max_pages_per_forum=10):
        """开始爬取帖子"""
        # 确保输出目录存在
        os.makedirs('data', exist_ok=True)
        
        for forum_index, forum_url in enumerate(self.forum_urls):
            self.current_forum_index = forum_index
            forum_name = forum_url.split('=')[-1]
            logger.info(f"开始爬取贴吧: {forum_name}")
            
            page = 1
            continue_scraping = True
            forum_posts = []
            
            while continue_scraping and page <= max_pages_per_forum:
                logger.info(f"正在爬取第 {page} 页...")
                
                # 获取帖子列表
                html = self.get_page(page)
                if not html:
                    logger.warning(f"第 {page} 页获取失败")
                    page += 1
                    time.sleep(5)
                    continue
                    
                # 解析帖子列表
                posts_to_scrape, continue_scraping = self.parse_post_list(html)
                
                if not posts_to_scrape:
                    logger.info(f"第 {page} 页没有符合条件的帖子")
                    page += 1
                    continue
                    
                # 爬取每个帖子的详细内容
                for i, post in enumerate(posts_to_scrape):
                    logger.info(f"正在爬取帖子 [{i+1}/{len(posts_to_scrape)}]: {post['title']}")
                    
                    # 获取帖子详细内容
                    post_content = self.get_post_content(post['link'])
                    if not post_content:
                        logger.warning(f"帖子内容获取失败: {post['title']}")
                        continue
                        
                    # 解析帖子详细内容
                    content_data = self.parse_post_content(post_content)
                    if content_data:
                        # 合并帖子信息和内容
                        full_post = {
                            **post,
                            **content_data,
                            'forum': forum_name
                        }
                        forum_posts.append(full_post)
                        logger.info(f"成功爬取: {post['title']} (作者: {content_data['author']}, 回复数: {content_data['reply_count']})")
                    else:
                        logger.warning(f"帖子内容解析失败: {post['title']}")
                    
                    # 随机延迟，避免请求过于频繁
                    sleep_time = random.uniform(2, 5)
                    time.sleep(sleep_time)
                
                # 每爬取一页保存一次数据
                if page % 1 == 0:
                    self.save_data(forum_posts, f'data/{forum_name}_posts_page_{page}.csv')
                
                # 下一页
                page += 1
                
                # 随机延迟，避免请求过于频繁
                sleep_time = random.uniform(3, 8)
                time.sleep(sleep_time)
            
            # 保存当前贴吧的最终数据
            self.save_data(forum_posts, f'data/{forum_name}_posts_full.csv')
            logger.info(f"贴吧 {forum_name} 爬取完成，共获取 {len(forum_posts)} 条帖子")
            self.posts.extend(forum_posts)
        
        # 保存所有贴吧的最终数据
        self.save_data(self.posts, 'data/all_sleep_posts_full.csv')
        logger.info(f"所有贴吧爬取完成，共获取 {len(self.posts)} 条帖子")
        
        return self.posts
    
    def save_data(self, posts, filename):
        """保存爬取的数据到CSV文件"""
        if not posts:
            logger.warning("没有数据可保存")
            return
            
        # 转换为DataFrame并保存
        data = []
        for post in posts:
            # 主贴数据
            main_post = {
                'forum': post.get('forum', ''),
                'title': post['title'],
                'url': post['link'],
                'author': post['author'],
                'post_date': post['date'],
                'content': post['content'],
                'reply_count': post.get('reply_count', 0),
                'type': '主贴'
            }
            
            data.append(main_post)
            
            # 添加回复
            for reply in post.get('replies', []):
                data.append({
                    'forum': post.get('forum', ''),
                    'title': f"回复: {post['title']}",
                    'url': post['link'],
                    'author': reply['author'],
                    'post_date': reply['date'],
                    'content': reply['content'],
                    'reply_count': 0,
                    'type': '回复'
                })
        
        df = pd.DataFrame(data)
        df.to_csv(filename, index=False, encoding='utf-8-sig')
        logger.info(f"数据已保存到 {filename}，共 {len(df)} 条记录")

# 使用示例
if __name__ == "__main__":
    # 创建爬虫实例
    scraper = BaiduTiebaScraper(start_date='2023-01-01', end_date='2025-12-31')
    
    # 开始爬取，每个贴吧爬取5页
    posts = scraper.scrape(max_pages_per_forum=5)
    
    # 打印爬取结果摘要
    print("\n爬取完成! 结果摘要:")
    print(f"总帖子数: {len(scraper.posts)}")
    
    # 按贴吧统计
    forums = {}
    for post in scraper.posts:
        forum = post.get('forum', '未知贴吧')
        forums[forum] = forums.get(forum, 0) + 1
    
    print("\n按贴吧统计:")
    for forum, count in forums.items():
        print(f"{forum}: {count} 条帖子")
    
    # 按日期统计
    dates = {}
    for post in scraper.posts:
        date = post['date'].strftime('%Y-%m') if isinstance(post['date'], datetime) else post['date']
        dates[date] = dates.get(date, 0) + 1
    
    print("\n按月份统计:")
    for date, count in sorted(dates.items()):
        print(f"{date}: {count} 条帖子")

2025-06-06 03:58:14,800 - INFO - 开始爬取贴吧: 失眠
2025-06-06 03:58:14,800 - INFO - 正在爬取第 1 页...
2025-06-06 03:58:14,800 - INFO - 正在获取页面: https://tieba.baidu.com/f?kw=失眠&pn=0
2025-06-06 03:58:15,235 - INFO - 第 1 页没有符合条件的帖子
2025-06-06 03:58:15,235 - INFO - 正在爬取第 2 页...
2025-06-06 03:58:15,240 - INFO - 正在获取页面: https://tieba.baidu.com/f?kw=失眠&pn=50
2025-06-06 03:58:15,683 - INFO - 第 2 页没有符合条件的帖子
2025-06-06 03:58:15,683 - INFO - 正在爬取第 3 页...
2025-06-06 03:58:15,683 - INFO - 正在获取页面: https://tieba.baidu.com/f?kw=失眠&pn=100
2025-06-06 03:58:16,089 - INFO - 第 3 页没有符合条件的帖子
2025-06-06 03:58:16,089 - INFO - 正在爬取第 4 页...
2025-06-06 03:58:16,089 - INFO - 正在获取页面: https://tieba.baidu.com/f?kw=失眠&pn=150
2025-06-06 03:58:16,546 - INFO - 第 4 页没有符合条件的帖子
2025-06-06 03:58:16,546 - INFO - 正在爬取第 5 页...
2025-06-06 03:58:16,546 - INFO - 正在获取页面: https://tieba.baidu.com/f?kw=失眠&pn=200
2025-06-06 03:58:16,997 - INFO - 第 5 页没有符合条件的帖子
2025-06-06 03:58:17,000 - INFO - 贴吧 失眠 爬取完成，共获取 0 条帖子
2025-06-06 03:58:17,002 - INFO - 开始


爬取完成! 结果摘要:
总帖子数: 0

按贴吧统计:

按月份统计:


In [36]:
#搭建睡眠健康系统

In [43]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.metrics import classification_report, confusion_matrix
from sklearn.preprocessing import StandardScaler
import joblib
import os
from datetime import datetime
import logging

# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

class SleepHealthSystem:
    def __init__(self, data_dir='data', model_dir='models'):
        self.data_dir = data_dir
        self.model_dir = model_dir
        
        # 创建目录
        if not os.path.exists(data_dir):
            os.makedirs(data_dir)
        if not os.path.exists(model_dir):
            os.makedirs(model_dir)
            
        # 初始化数据处理器和模型
        self.data_processor = DataProcessor()
        self.sleep_analyzer = SleepAnalyzer()
        self.recommendation_engine = RecommendationEngine()
        self.model = None
        
    def collect_data(self, data_source, **kwargs):
        """从不同来源收集数据"""
        logger.info(f"从 {data_source} 收集数据...")
        
        if data_source == 'public_dataset':
            collector = PublicDatasetCollector(self.data_dir)
            return collector.fetch_dataset(kwargs.get('dataset_name'))
        
        elif data_source == 'api':
            collector = APIConnector(self.data_dir)
            return collector.fetch_data(kwargs.get('api_name'), **kwargs)
        
        elif data_source == 'web_scraper':
            collector = WebScraper(self.data_dir)
            return collector.scrape(kwargs.get('website'), **kwargs)
        
        else:
            logger.error(f"不支持的数据来源: {data_source}")
            return None
    
    def preprocess_data(self, raw_data):
        """预处理收集到的数据"""
        logger.info("预处理数据...")
        return self.data_processor.process(raw_data)
    
    def train_model(self, processed_data, target_column='sleep_quality'):
        """训练睡眠质量预测模型"""
        logger.info("训练睡眠质量预测模型...")
        
        # 准备特征和目标变量
        X = processed_data.drop(target_column, axis=1)
        y = processed_data[target_column]
        
        # 划分训练集和测试集
        X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
        
        # 标准化特征
        scaler = StandardScaler()
        X_train_scaled = scaler.fit_transform(X_train)
        X_test_scaled = scaler.transform(X_test)
        
        # 训练模型
        self.model = GradientBoostingClassifier(n_estimators=100, random_state=42)
        self.model.fit(X_train_scaled, y_train)
        
        # 评估模型
        y_pred = self.model.predict(X_test_scaled)
        logger.info("模型评估结果:")
        logger.info(classification_report(y_test, y_pred))
        
        # 保存模型和特征列表
        joblib.dump(self.model, f"{self.model_dir}/sleep_quality_model.pkl")
        joblib.dump(scaler, f"{self.model_dir}/scaler.pkl")
        joblib.dump(X.columns.tolist(), f"{self.model_dir}/feature_list.pkl")
        
        return self.model
    
    def predict_sleep_quality(self, user_data):
        """预测用户睡眠质量"""
        if self.model is None:
            try:
                self.model = joblib.load(f"{self.model_dir}/sleep_quality_model.pkl")
                scaler = joblib.load(f"{self.model_dir}/scaler.pkl")
                feature_list = joblib.load(f"{self.model_dir}/feature_list.pkl")
            except FileNotFoundError:
                logger.error("模型文件不存在，请先训练模型")
                return None
        
        # 预处理用户数据
        processed_data = self.data_processor.process_user_data(user_data)
        
        # 确保特征顺序一致
        processed_data = processed_data[feature_list]
        
        # 标准化特征
        scaled_data = scaler.transform(processed_data)
        
        # 预测睡眠质量
        prediction = self.model.predict(scaled_data)[0]
        probability = self.model.predict_proba(scaled_data)[0]
        
        return {
            'prediction': prediction,
            'probability': probability,
            'quality_level': self.sleep_analyzer.get_quality_level(prediction)
        }
    
    def generate_recommendations(self, user_data, prediction_result):
        """生成个性化睡眠改善建议"""
        return self.recommendation_engine.generate(user_data, prediction_result)
    
    def visualize_sleep_patterns(self, user_data, output_dir='visualizations'):
        """可视化用户睡眠模式"""
        if not os.path.exists(output_dir):
            os.makedirs(output_dir)
            
        return self.sleep_analyzer.visualize(user_data, output_dir)
    
    def run_pipeline(self, data_sources, train_model=True):
        """运行完整的处理流程"""
        all_data = []
        
        # 收集多源数据
        for source in data_sources:
            data = self.collect_data(**source)
            if data is not None:
                all_data.append(data)
        
        # 合并数据
        if all_data:
            combined_data = pd.concat(all_data, ignore_index=True)
            logger.info(f"成功收集 {len(combined_data)} 条数据记录")
            
            # 预处理数据
            processed_data = self.preprocess_data(combined_data)
            
            # 训练模型
            if train_model:
                self.train_model(processed_data)
            
            return processed_data
        else:
            logger.error("未收集到任何数据")
            return None

# 数据处理器
class DataProcessor:
    def process(self, raw_data):
        """处理原始数据"""
        # 数据清洗
        data = self._clean_data(raw_data)
        
        # 特征工程
        data = self._feature_engineering(data)
        
        # 处理缺失值
        data = self._handle_missing_values(data)
        
        # 处理异常值
        data = self._handle_outliers(data)
        
        return data
    
    def process_user_data(self, user_data):
        """处理用户输入的数据"""
        # 转换为DataFrame
        if not isinstance(user_data, pd.DataFrame):
            user_data = pd.DataFrame([user_data])
        
        # 应用相同的特征工程
        user_data = self._feature_engineering(user_data)
        
        # 确保所有必要的特征都存在
        required_features = ['age', 'gender', 'sleep_duration', 'deep_sleep_ratio', 
                            'awake_count', 'bedtime_hour', 'wakeup_hour', 
                            'stress_level', 'caffeine_intake', 'physical_activity']
        
        for feature in required_features:
            if feature not in user_data.columns:
                user_data[feature] = 0  # 用默认值填充缺失特征
        
        return user_data
    
    def _clean_data(self, data):
        """清洗数据：去重、类型转换等"""
        # 去重
        data = data.drop_duplicates()
        
        # 处理日期时间
        if 'date' in data.columns:
            data['date'] = pd.to_datetime(data['date'])
        
        # 处理分类变量
        categorical_cols = data.select_dtypes(include=['object']).columns
        for col in categorical_cols:
            data[col] = data[col].astype('category')
        
        return data
    
    def _feature_engineering(self, data):
        """特征工程：创建新特征"""
        # 计算睡眠效率
        if 'deep_sleep_minutes' in data.columns and 'total_sleep_minutes' in data.columns:
            data['deep_sleep_ratio'] = data['deep_sleep_minutes'] / data['total_sleep_minutes']
        
        # 计算睡眠时长（小时）
        if 'total_sleep_minutes' in data.columns:
            data['sleep_duration'] = data['total_sleep_minutes'] / 60
        
        # 计算睡眠规律度（入睡时间与起床时间的标准差）
        if 'bedtime_hour' in data.columns and 'wakeup_hour' in data.columns:
            # 这里简化处理，实际应按用户分组计算标准差
            pass
        
        # 计算压力与睡眠的交互作用
        if 'stress_level' in data.columns and 'sleep_duration' in data.columns:
            data['stress_sleep_interaction'] = data['stress_level'] * data['sleep_duration']
        
        # 计算咖啡因与睡眠的交互作用
        if 'caffeine_intake' in data.columns and 'sleep_duration' in data.columns:
            data['caffeine_sleep_interaction'] = data['caffeine_intake'] * data['sleep_duration']
        
        return data
    
    def _handle_missing_values(self, data):
        """处理缺失值"""
        # 数值型特征用中位数填充
        numeric_cols = data.select_dtypes(include=[np.number]).columns
        for col in numeric_cols:
            data[col] = data[col].fillna(data[col].median())
        
        # 分类型特征用众数填充
        categorical_cols = data.select_dtypes(include=['category']).columns
        for col in categorical_cols:
            data[col] = data[col].fillna(data[col].mode()[0])
        
        return data
    
    def _handle_outliers(self, data):
        """处理异常值"""
        numeric_cols = data.select_dtypes(include=[np.number]).columns
        
        for col in numeric_cols:
            # 使用IQR方法检测异常值
            Q1 = data[col].quantile(0.25)
            Q3 = data[col].quantile(0.75)
            IQR = Q3 - Q1
            
            # 定义异常值范围
            lower_bound = Q1 - 1.5 * IQR
            upper_bound = Q3 + 1.5 * IQR
            
            # 替换异常值为边界值
            data[col] = np.where(data[col] < lower_bound, lower_bound, data[col])
            data[col] = np.where(data[col] > upper_bound, upper_bound, data[col])
        
        return data

# 睡眠分析器
class SleepAnalyzer:
    def analyze_sleep_quality(self, data):
        """分析睡眠质量"""
        results = {
            'average_sleep_duration': data['sleep_duration'].mean(),
            'average_deep_sleep_ratio': data['deep_sleep_ratio'].mean(),
            'sleep_quality_distribution': data['sleep_quality'].value_counts(),
            'correlation_matrix': data.select_dtypes(include=[np.number]).corr()
        }
        return results
    
    def get_quality_level(self, score):
        """将睡眠质量分数转换为等级"""
        if score >= 4:
            return "优秀"
        elif score >= 3:
            return "良好"
        elif score >= 2:
            return "一般"
        else:
            return "较差"
    
    def visualize(self, data, output_dir):
        """生成睡眠模式可视化"""
        visualizations = {}
        
        # 睡眠时长分布
        plt.figure(figsize=(10, 6))
        sns.histplot(data['sleep_duration'], kde=True)
        plt.title('睡眠时长分布')
        plt.xlabel('睡眠时长（小时）')
        plt.ylabel('频率')
        plt.savefig(f"{output_dir}/sleep_duration_distribution.png")
        visualizations['sleep_duration'] = f"{output_dir}/sleep_duration_distribution.png"
        
        # 睡眠质量与其他因素的关系
        plt.figure(figsize=(12, 8))
        numeric_cols = data.select_dtypes(include=[np.number]).columns
        sns.heatmap(data[numeric_cols].corr(), annot=True, cmap='coolwarm')
        plt.title('睡眠质量相关因素热力图')
        plt.savefig(f"{output_dir}/sleep_correlation_heatmap.png")
        visualizations['correlation'] = f"{output_dir}/sleep_correlation_heatmap.png"
        
        # 睡眠阶段分布
        if 'deep_sleep_ratio' in data.columns and 'light_sleep_ratio' in data.columns and 'rem_sleep_ratio' in data.columns:
            plt.figure(figsize=(10, 6))
            stages = ['深度睡眠', '浅度睡眠', 'REM睡眠']
            ratios = [data['deep_sleep_ratio'].mean(), 
                     data['light_sleep_ratio'].mean(), 
                     data['rem_sleep_ratio'].mean()]
            
            plt.pie(ratios, labels=stages, autopct='%1.1f%%', startangle=90)
            plt.title('平均睡眠阶段分布')
            plt.axis('equal')
            plt.savefig(f"{output_dir}/sleep_stages_distribution.png")
            visualizations['sleep_stages'] = f"{output_dir}/sleep_stages_distribution.png"
        
        return visualizations

# 建议引擎
class RecommendationEngine:
    def __init__(self):
        # 建议规则库
        self.rules = [
            {
                'condition': lambda data, pred: pred['prediction'] < 2,
                'recommendations': [
                    "您的睡眠质量较差，建议保持规律作息，每天固定时间上床睡觉和起床",
                    "睡前1小时避免使用电子设备，蓝光会抑制褪黑素分泌",
                    "保持卧室环境安静、黑暗和凉爽，温度建议在18-22°C之间",
                    "考虑咨询专业医生，排除潜在的睡眠障碍"
                ]
            },
            {
                'condition': lambda data, pred: pred['prediction'] < 3 and data.get('deep_sleep_ratio', 0) < 0.2,
                'recommendations': [
                    "您的深度睡眠不足，建议增加日间体力活动，但避免睡前3小时剧烈运动",
                    "尝试冥想或深呼吸练习，有助于减轻压力，提高深度睡眠质量",
                    "避免睡前摄入咖啡因和大量液体"
                ]
            },
            {
                'condition': lambda data, pred: pred['prediction'] < 3 and data.get('sleep_duration', 0) < 7,
                'recommendations': [
                    "您的睡眠时长不足，建议调整日程安排，确保每天有足够的睡眠时间",
                    "减少夜间应酬和工作，优先保障睡眠",
                    "如果有入睡困难问题，可以尝试睡前温水浴或阅读轻松的书籍"
                ]
            },
            {
                'condition': lambda data, pred: pred['prediction'] < 3 and data.get('stress_level', 0) > 7,
                'recommendations': [
                    "您的压力水平较高，建议学习压力管理技巧，如时间管理、情绪调节",
                    "考虑参加放松活动，如瑜伽、太极拳或听轻音乐",
                    "必要时寻求心理咨询帮助，缓解工作和生活压力"
                ]
            },
            {
                'condition': lambda data, pred: pred['prediction'] >= 3,
                'recommendations': [
                    "您的睡眠质量良好，请继续保持健康的睡眠习惯",
                    "定期进行体育锻炼，有助于维持良好的睡眠质量",
                    "保持均衡饮食，避免过度饮酒和吸烟"
                ]
            }
        ]
    
    def generate(self, user_data, prediction_result):
        """生成个性化建议"""
        recommendations = []
        
        # 检查每条规则
        for rule in self.rules:
            if rule['condition'](user_data, prediction_result):
                recommendations.extend(rule['recommendations'])
                break  # 找到匹配规则后停止检查
        
        # 添加基于用户特定数据的建议
        if 'caffeine_intake' in user_data and user_data['caffeine_intake'] > 3:
            recommendations.append("您的咖啡因摄入量较高，建议减少咖啡、茶和碳酸饮料的摄入，特别是在下午和晚上")
        
        if 'physical_activity' in user_data and user_data['physical_activity'] < 2:
            recommendations.append("您的体力活动不足，建议每周进行至少150分钟的中等强度有氧运动，如快走、游泳或骑自行车")
        
        return recommendations

# 数据收集器
class PublicDatasetCollector:
    def __init__(self, data_dir):
        self.data_dir = data_dir
        
    def fetch_dataset(self, dataset_name):
        """从公开数据集获取数据"""
        # 这里应该实现实际的数据下载逻辑
        # 为简化示例，我们假设数据已经下载并存储在data_dir中
        
        if dataset_name == 'sleep_quality':
            file_path = f"{self.data_dir}/sleep_quality_dataset.csv"
            try:
                return pd.read_csv(file_path)
            except FileNotFoundError:
                # 在实际应用中，这里应该实现数据下载逻辑
                logger.error(f"数据集文件不存在: {file_path}")
                return None
        
        logger.error(f"不支持的数据集: {dataset_name}")
        return None

# API连接器
class APIConnector:
    def __init__(self, data_dir):
        self.data_dir = data_dir
        
    def fetch_data(self, api_name, **kwargs):
        """通过API获取数据"""
        
        if api_name == 'fitbit':
            # 模拟从Fitbit API获取的睡眠数据
            return pd.DataFrame({
                'user_id': [kwargs.get('user_id', 1)] * 7,
                'date': pd.date_range(start=kwargs.get('start_date', '2023-01-01'), periods=7),
                'total_sleep_minutes': np.random.randint(300, 540, 7),
                'deep_sleep_minutes': np.random.randint(60, 180, 7),
                'light_sleep_minutes': np.random.randint(180, 360, 7),
                'rem_sleep_minutes': np.random.randint(60, 120, 7),
                'awake_count': np.random.randint(1, 10, 7)
            })
        
        logger.error(f"不支持的API: {api_name}")
        return None

# 网页爬虫
class WebScraper:
    def __init__(self, data_dir):
        self.data_dir = data_dir
        
    def scrape(self, website, **kwargs):
        """从网页爬取数据"""
        
        if website == 'medical_forum':
            # 模拟从医疗论坛爬取的睡眠相关帖子
            return pd.DataFrame({
                'post_id': range(1, 101),
                'title': [f"关于睡眠问题的讨论 {i}" for i in range(1, 101)],
                'content': [f"我最近睡眠质量{i%5}，不知道该怎么办..." for i in range(1, 101)],
                'author': [f"用户{i}" for i in range(1, 101)],
                'date': pd.date_range(start='2023-01-01', periods=100)
            })
        
        logger.error(f"不支持的网站: {website}")
        return None

# 网页应用
def create_web_app(system):
    """创建Web应用界面"""
    try:
        import streamlit as st
        from datetime import datetime
        
        st.title("睡眠质量分析与改善建议系统")
        
        # 用户数据输入
        st.sidebar.header("用户数据输入")
        
        user_id = st.sidebar.text_input("用户ID", "user123")
        age = st.sidebar.slider("年龄", 18, 80, 30)
        gender = st.sidebar.selectbox("性别", ["男", "女", "其他"])
        
        sleep_duration = st.sidebar.slider("平均睡眠时长(小时)", 4.0, 12.0, 7.0, 0.5)
        deep_sleep_ratio = st.sidebar.slider("深度睡眠比例", 0.0, 1.0, 0.25, 0.05)
        awake_count = st.sidebar.slider("夜间醒来次数", 0, 10, 2)
        
        bedtime_hour = st.sidebar.slider("通常入睡时间(小时)", 18, 24, 23)
        wakeup_hour = st.sidebar.slider("通常起床时间(小时)", 5, 10, 7)
        
        stress_level = st.sidebar.slider("压力水平(1-10)", 1, 10, 5)
        caffeine_intake = st.sidebar.slider("咖啡因摄入量(杯/天)", 0, 10, 2)
        physical_activity = st.sidebar.slider("体力活动水平(1-10)", 1, 10, 5)
        
        # 分析按钮
        if st.sidebar.button("分析睡眠质量"):
            user_data = {
                'user_id': user_id,
                'age': age,
                'gender': gender,
                'sleep_duration': sleep_duration,
                'deep_sleep_ratio': deep_sleep_ratio,
                'awake_count': awake_count,
                'bedtime_hour': bedtime_hour,
                'wakeup_hour': wakeup_hour,
                'stress_level': stress_level,
                'caffeine_intake': caffeine_intake,
                'physical_activity': physical_activity
            }
            
            # 预测睡眠质量
            prediction = system.predict_sleep_quality(user_data)
            
            # 生成建议
            recommendations = system.generate_recommendations(user_data, prediction)
            
            # 可视化睡眠模式
            visualizations = system.visualize_sleep_patterns(pd.DataFrame([user_data]))
            
            # 显示结果
            st.header("睡眠质量分析结果")
            
            st.subheader("睡眠质量评分")
            st.write(f"您的睡眠质量评分为: {prediction['prediction']}/5 ({prediction['quality_level']})")
            
            st.subheader("睡眠质量预测概率")
            st.bar_chart({
                "评分": [1, 2, 3, 4, 5],
                "概率": prediction['probability']
            })
            
            st.subheader("睡眠模式可视化")
            if 'sleep_duration' in visualizations:
                st.image(visualizations['sleep_duration'], caption="睡眠时长分布")
            
            if 'sleep_stages' in visualizations:
                st.image(visualizations['sleep_stages'], caption="睡眠阶段分布")
            
            st.subheader("个性化改善建议")
            for i, rec in enumerate(recommendations, 1):
                st.write(f"{i}. {rec}")
            
            st.subheader("健康管理计划")
            st.write("根据您的睡眠分析结果，我们建议您制定以下健康管理计划：")
            st.write("- 保持规律的作息时间，每天尽量在相同的时间上床睡觉和起床")
            st.write("- 睡前1小时避免使用电子设备，减少蓝光对睡眠的干扰")
            st.write("- 创造安静、舒适的睡眠环境，保持卧室温度适宜")
            st.write("- 定期进行体育锻炼，但避免在睡前3小时内剧烈运动")
            st.write("- 记录睡眠日记，跟踪睡眠质量变化，必要时咨询专业医生")
            
            st.success("分析完成！建议您定期使用本系统监测睡眠质量变化。")
    
    except ImportError:
        logger.error("请安装Streamlit库以运行Web应用: pip install streamlit")

# 主程序入口
if __name__ == "__main__":
    # 创建系统实例
    system = SleepHealthSystem()
    
    # 运行数据收集和处理流程
    data_sources = [
        {
            'data_source': 'public_dataset',
            'dataset_name': 'sleep_quality'
        },
        {
            'data_source': 'api',
            'api_name': 'fitbit',
            'user_id': 'test_user',
            'start_date': '2023-01-01'
        },
        {
            'data_source': 'web_scraper',
            'website': 'medical_forum'
        }
    ]
    
    # 处理数据并训练模型
    processed_data = system.run_pipeline(data_sources)
    
    # 创建并运行Web应用
    create_web_app(system)
