In [40]:
# 导入第三方库
import pandas as pd
import numpy as np
import datetime
from pyecharts import charts
from pyecharts import options as opts
from bs4 import BeautifulSoup
import re
import AIUESAGENT as AT

# 参数设置
DATA_PATH = 'data/cost_data.xlsx'

In [41]:
def proc_washing_data(file_path):
    '''
    数据清洗

    参数:
        file_path (str): 数据文件路径

    返回:
        df (DataFrame): 清洗之后的数据
    '''
    df = pd.read_excel(file_path)

    # 列名替换
    idx = df.index
    up_columns = df.columns.values
    low_columns = df.iloc[0,:].fillna('error').values
    new_low_columns = low_columns
    for i,j in enumerate(low_columns):
        if j == 'error':
            new_low_columns[i] = up_columns[i]
    df = df.iloc[1:,:]
    df.columns = new_low_columns

    # 数据格式转换
    df['日期'] = pd.to_datetime(df['日期'], format='%Y%m%d')
    
    return df

In [42]:
def extract_monthly_expenses(df, year, month):
    """
    提取指定年月的每日消费数据

    参数:
        df (DataFrame): 数据
        year (int): 指定年份，如 2024
        month (int): 指定月份，如 3 表示 3月

    返回:
        pd.DataFrame: 指定月份的消费数据，若无数据则返回空 DataFrame
    """

    # 提取目标年月的数据
    mask = (df['日期'].dt.year == year) & (df['日期'].dt.month == month)
    monthly_data = df[mask].copy()

    # 填充缺失的消费金额
    monthly_data.fillna(0, inplace=True)

    # 按日期排序
    monthly_data.sort_values('日期', inplace=True)

    return monthly_data.reset_index(drop=True)

In [43]:
def extract_chart_from_html(source_html_path):
    # 读取源HTML文件
    with open(source_html_path, 'r', encoding='utf-8') as f:
        source_content = f.read()
    
    # 解析HTML
    soup = BeautifulSoup(source_content, 'html.parser')
    
    # 提取图表容器（通常带有特定ID或class）
    chart_div = soup.find('body').find('div')
    chart_script = soup.find('body').find('script')
    
    # 返回提取的图表内容
    return (chart_div, chart_script)

# print(extract_chart_from_html('bar.html'))

In [44]:
def save_report_to_html(report, title, tables):
    """
    保存报告到 HTML 文件

    Args:
        report (str): 报告内容
        title (str): 报告标题
        tables (list): 表格列表
        images (list): 图片列表

    Returns:
        None
    """

    images = [1]

    # 基本数据
    section1 = re.search(re.compile(r"<title 1>(.*?)<title 2>", re.S), report).group().split('\n',1)
    section1_title = section1[0].split('<title 1>')[1]
    section1_content = section1[1].split('<title 2>')[0].split('\n')
    section1_content = '</p><p>'.join(section1_content)
    images[0] = extract_chart_from_html('image/section_1.html')
    
    section2 = re.search(re.compile(r"<title 2>(.*?)<title 3>", re.S), report).group().split('\n',1)
    section2_title = section2[0].split('<title 2>')[1]
    section2_content = section2[1].split('<title 3>')[0].split('\n')
    section2_content = '</p><p>'.join(section2_content)

    section3 = re.search(re.compile(r"<title 3>(.*?)<title 4>", re.S), report).group().split('\n',1)
    section3_title = section3[0].split('<title 3>')[1]
    section3_content = section3[1].split('<title 4>')[0].split('\n')
    section3_content = '</p><p>'.join(section3_content)

    section4 = re.search(re.compile(r"<title 4>(.*?)<table 1>", re.S), report).group().split('\n',1)
    section4_title = section4[0].split('<title 4>')[1]
    df_html = tables[0].to_html(classes="table table-striped", index=False)

    # HTML 模板
    html = f"""
    <!DOCTYPE html>
    <html>
    <head>
        <meta charset="utf-8">
        <title>{title[:-5]}</title>
        <link rel="stylesheet" href="https://cdn.jsdelivr.net/npm/bootstrap@5.3.0/dist/css/bootstrap.min.css">
        <meta charset="UTF-8">
        <script type="text/javascript" src="https://assets.pyecharts.org/assets/v5/echarts.min.js"></script>
    </head>
    <body class="container mt-4">
        <h1 class="text-center mb-4">{title[:-5]}</h1>

        <h3>{section1_title}</h3>
        <p>{images[0][0]}{images[0][1]}</p>
        <p>{section1_content}</p>

        <h3>{section2_title}</h3>
        <p>{section2_content}</p>

        <h3>{section3_title}</h3>
        <p>{section3_content}</p>

        <h3>{section4_title}</h3>
        {df_html}
    </body>
    </html>
    """
    with open(f"{title}", "w", encoding="utf-8") as f:
        f.write(html)

In [45]:
def analyze_and_generate_report(df, year, month):
    """
    分析指定年月的消费数据并生成详细报告
    
    参数:
        df (DataFrame): 数据
        year (int): 指定年份
        month (int): 指定月份
    
    返回:
        dict: 包含分析结果和报告的字典
    """
    # 获取指定月份的数据
    monthly_data = extract_monthly_expenses(df, year, month)
    
    if monthly_data.empty:
        return {
            "summary": f"未找到 {year}年{month}月 的消费记录。",
            "data": monthly_data,
            "statistics": None,
            "report": ""
        }
    
    # 数据分析部分
    # 1. 基础统计信息
    total_expense = monthly_data['总支出/天'].sum()
    average_daily_expense = total_expense / len(monthly_data['日期'].dt.date.unique())
    total_common_income = monthly_data['总收入/天'].sum()
    average_daily_income = total_expense / len(monthly_data['日期'].dt.date.unique())
    total_benefit = monthly_data['总收入/天'].sum() - monthly_data['总支出/天'].sum()
    average_daily_benefit = total_expense / len(monthly_data['日期'].dt.date.unique())

    # 2. 分类统计（如果有category列）
    category_stats = {}
    for col in df.columns[6:-4]:
        if col != '总支出/天' and col != '日期' and col != '总收入/天':
            # monthly_data[col] = monthly_data[col].astype(str)
            category_stats[col] = monthly_data[col].sum()
    income_stats = {}
    for col in df.columns[-4:]:
        if col != '总支出/天' and col != '日期' and col != '总收入/天':
            # monthly_data[col] = monthly_data[col].astype(str)
            income_stats[col] = monthly_data[col].sum()
    # print(category_stats)
    
    # 3. 日消费趋势
    daily_expense = {}
    if '总支出/天' in monthly_data.columns[6:-4]:
        daily_expense = monthly_data.groupby(monthly_data['日期'].dt.date)['总支出/天'].sum().to_dict()
    daily_income = {}
    if '总收入/天' in monthly_data.columns[-4:]:
        daily_income = monthly_data.groupby(monthly_data['日期'].dt.date)['总收入/天'].sum().to_dict()
    # print(daily_expense)
    
    # 组织统计数据
    statistics = {
        "total_expense": total_expense,
        "average_daily_expense": average_daily_expense,
        "total_common_income": total_common_income,
        "total_expense_with_income": total_benefit,
        "category_stats": category_stats,
        "daily_expense": daily_expense,
        "income_stats": income_stats,
        "daily_income": daily_income
    }

    # 生成报告文本与图片
    report_lines = []
    system_prompt = '''你是一名数据分析专家，请根据提供的数据，生成一份详细消费报告。'''

    # 第一部分的文字内容
    report_lines.append("<title 1>一、总体概况")
    prompt_1 = f'''
    以下为消费金额占比:
    总支出金额: ¥{total_expense:.2f},
    日均消费金额: ¥{average_daily_expense:.2f},
    总收入金额: ¥{total_common_income:.2f},
    日均收入金额: ¥{average_daily_income:.2f},
    总净收入: ¥{total_benefit:.2f},
    日均净收入: ¥{average_daily_benefit:.2f}
    '''
    report_lines.append(AT.AIUESAgent().get_response(prompt_1,system_prompt))
    report_lines.append("")
    # 第一部分的图片内容
    (charts.Grid()
     .add(
        charts.Bar()
        .add_xaxis(['总支出金额', '总收入金额', '总净收入'])
        .add_yaxis("金额", [total_expense, total_common_income, total_benefit])
        .set_global_opts(title_opts=opts.TitleOpts(title="消费金额")),
        grid_opts=opts.GridOpts(pos_left="0%", pos_right="50%"))
    .add(
        charts.Bar()
        .add_xaxis(['日均消费金额', '日均收入金额', '日均净收入'])
        .add_yaxis("金额", [average_daily_expense, average_daily_income, average_daily_benefit])
        .set_global_opts(title_opts=opts.TitleOpts(title="消费金额")),
        grid_opts=opts.GridOpts(pos_left="50%", pos_right="0%"))
    .render('image/section_1.html'))
    
    # 第二部分的文字内容
    if category_stats:
        report_lines.append("<title 2>二、分类收支统计")
        report_lines.append("  以下为消费金额占比:")
        sorted_categories = sorted(category_stats.items(), key=lambda x: x[1], reverse=True)
        for category, amount in sorted_categories:
            percentage = (amount / total_expense * 100) if total_expense > 0 else 0
            report_lines.append(f"  {category}: ¥{amount:.2f} ({percentage:.1f}%)")
        sorted_income = sorted(income_stats.items(), key=lambda x: x[1], reverse=True)
        report_lines.append("  以下为收入金额占比:")
        for income, amount in sorted_income:
            percentage = (amount / total_common_income * 100) if total_common_income > 0 else 0
            report_lines.append(f"  {income}: ¥{amount:.2f} ({percentage:.1f}%)")
        report_lines.append("")
        # 第二部分的图片内容
    
    if daily_expense:
        report_lines.append("<title 3>三、日消费趋势")
        max_expense_date = max(daily_expense.items(), key=lambda x: x[1])
        min_expense_date = min(daily_expense.items(), key=lambda x: x[1])
        max_income_date = max(daily_income.items(), key=lambda x: x[1])
        min_income_date = min(daily_income.items(), key=lambda x: x[1])
        report_lines.append("  以下为消费金额:")
        report_lines.append(f"  消费最高日: {max_expense_date[0]} - ¥{max_expense_date[1]:.2f}")
        report_lines.append(f"  消费最低日: {min_expense_date[0]} - ¥{min_expense_date[1]:.2f}")
        report_lines.append("  以下为收入金额:")
        report_lines.append(f"  收入最高日: {max_income_date[0]} - ¥{max_income_date[1]:.2f}")
        report_lines.append(f"  收入最低日: {min_income_date[0]} - ¥{min_income_date[1]:.2f}")
        report_lines.append("")
    
    # 添加数据详情
    report_lines.append("<title 4>四、详细消费记录")
    report_lines.append("<table 1>")
    
    report = "\n".join(report_lines)
    
    # 打印报告
    # print(report)

    # 保存报告至 html 文件
    save_report_to_html(report, f"{year}年{month}月消费报告.html", [monthly_data])
    
    # 返回结构化结果
    return {
        "summary": f"{year}年{month}月共消费¥{total_expense:.2f}",
        "data": monthly_data,
        "statistics": statistics,
        "report": report
    }



In [46]:
# --------------------------
# 使用示例
# --------------------------
if __name__ == "__main__":
    # 示例调用
    df = proc_washing_data(DATA_PATH)
    target_year = 2025
    target_month = 11  # 11月

    result = extract_monthly_expenses(df, target_year, target_month)

    analyze_and_generate_report(df, target_year, target_month)['report']

  monthly_data.fillna(0, inplace=True)
  monthly_data.fillna(0, inplace=True)
