In [17]:
import os
from docx import Document
from reportlab.pdfgen import canvas
from reportlab.lib.pagesizes import letter

def save_dict_to_file(dictionary, file_path, file_name, file_type, keys=None):
    """
    将字典保存为本地文件，支持 Word、PDF 和 Markdown 格式。
    
    参数:
        dictionary (dict): 要保存的字典
        file_path (str): 文件保存路径
        file_name (str): 文件名
        file_type (str): 文件类型，支持 'word'、'pdf' 或 'markdown'
        keys (list, optional): 要保存的键列表。如果为 None，则保存整个字典。
    """
    # 如果指定了 keys，则过滤字典
    if keys:
        dictionary = {key: dictionary[key] for key in keys if key in dictionary}
    
    # 确保文件路径存在
    os.makedirs(file_path, exist_ok=True)
    
    # 根据文件类型选择保存方式
    if file_type.lower() == 'word':
        # 保存为 Word 文件
        doc = Document()
        for key, value in dictionary.items():
            doc.add_heading(key, level=1)
            doc.add_paragraph(str(value))
        doc.save(os.path.join(file_path, f"{file_name}.docx"))
        print(f"字典已保存为 Word 文件: {file_name}.docx")
    
    elif file_type.lower() == 'pdf':
        # 保存为 PDF 文件
        pdf_path = os.path.join(file_path, f"{file_name}.pdf")
        c = canvas.Canvas(pdf_path, pagesize=letter)
        width, height = letter
        y = height - 100
        
        for key, value in dictionary.items():
            c.setFont("Helvetica-Bold", 14)
            c.drawString(50, y, key)
            y -= 20
            c.setFont("Helvetica", 12)
            c.drawString(70, y, str(value))
            y -= 30
            if y < 50:
                c.showPage()
                y = height - 100
        
        c.save()
        print(f"字典已保存为 PDF 文件: {file_name}.pdf")
    
    elif file_type.lower() == 'markdown':
        # 保存为 Markdown 文件
        md_path = os.path.join(file_path, f"{file_name}.md")
        with open(md_path, 'w', encoding='utf-8') as f:
            for key, value in dictionary.items():
                f.write(f"### {key}\n")
                f.write(f"{value}\n\n")
        print(f"字典已保存为 Markdown 文件: {file_name}.md")
    
    else:
        raise ValueError("不支持的文件类型，请选择 'word'、'pdf' 或 'markdown'")

# 示例用法
if __name__ == "__main__":
    my_dict = {
        "姓名": "张三",
        "年龄": 30,
        "职业": "程序员",
        "兴趣爱好": ["编程", "阅读", "旅行"]
    }
    
    # 保存指定键为 Word 文件
    save_dict_to_file(my_dict, "Results/output", "example_selected", "word", keys=["姓名", "职业"])
    
    # 保存指定键为 PDF 文件
    save_dict_to_file(my_dict, "Results/output", "example_selected", "pdf", keys=["年龄", "兴趣爱好"])
    
    # 保存指定键为 Markdown 文件
    save_dict_to_file(my_dict, "Results/output", "example_selected", "markdown", keys=["姓名", "兴趣爱好"])

字典已保存为 Word 文件: example_selected.docx
字典已保存为 PDF 文件: example_selected.pdf
字典已保存为 Markdown 文件: example_selected.md


In [None]:
import asyncio
from typing import Dict, Any, List, Tuple
from langchain_openai import ChatOpenAI

from memory.draft import rural_DraftState
from guidelines import guidelines

from dotenv import load_dotenv
load_dotenv()


class IndustryChainAgent:
    """
    产业链分析智能体，用于分析核心产业的上下游产业链，并提出优化建议。
    """

    def __init__(self):
        """
        初始化产业链分析智能体。
        """
        self.chain_segments = {
            "upstream": "上游原材料供应，例如种子、肥料、农药等",
            "midstream": "中游生产加工，例如种植、采摘、初加工等",
            "downstream": "下游市场销售，例如包装、物流、销售等",
            "supporting_services": "配套服务，例如农业技术培训、农产品质量检测等"
        }

    async def analyze_segment(self, draft: rural_DraftState, segment: str) -> Dict[str, Any]:
        """
        分析单个产业链环节。

        :param draft: rural_DraftState 实例
        :param segment: 产业链环节（例如 upstream, midstream 等）
        :param description: 产业链环节的描述
        :return: 分析结果（JSON 格式）
        """
        print(f"开始分析产业链{segment}环节\n")
        # 构造提示信息
        prompt = f'''
请根据以下信息分析{draft["village_name"]}村的核心产业（{draft["local_condition"]["core_industry"]}）的{segment}环节，并提出优化建议：

**村庄基本信息**：{draft["document"]}和基本信息分析{draft["navigate_analysis"]}

**要求**：
- 分析该环节的现状和问题。
- 提出具体的优化建议。
- 请以 JSON 格式返回结果，格式如下：
  {{
    "segment": "{segment}",
    "current_status": "现状描述",
    "issues": "问题列表",
    "recommendations": "优化建议"
  }}
'''

        try:
            # 调用模型生成分析报告
            response = await ChatOpenAI(model_name=draft["model"]).ainvoke([{"role": "user", "content": prompt}])
            print(f"产业链{segment}环节分析完成")
            return response.content
        except Exception as e:
            print(f"分析 {segment} 环节时出错：{e}")
            return {"segment": segment, "error": str(e)}

    async def parallel_analyze_chains(self, draft: rural_DraftState) -> Dict[str, Any]:
        """
        并行分析核心产业的上下游产业链。

        :param draft: rural_DraftState 实例
        :return: 更新后的 draft_state，包含产业链分析结果和优化建议
        """
        print("开始并行分析产业链\n")
        tasks = []

        # 为每个产业链环节创建分析任务
        for segment, description in self.chain_segments.items():
            task = self.analyze_segment(draft, segment)
            tasks.append(task)

        # 并行执行所有任务
        results = await asyncio.gather(*tasks)

        # 合并结果到 draft 中
        if "industry_chain" not in draft:
            draft["industry_chain"] = {}
        draft["industry_chain"] = {result: result for result in results if "segment" in result}

        print("并行分析完成")
        return draft  # 返回最终的 draft_state


if __name__ == "__main__":
    # 创建 rural_DraftState 实例
    draft = rural_DraftState(
        document="""
                金田村自然环境很好，适合特色农产品种植。
                """,
        village_name="金田村",
        model="qwen/qwen2.5-vl-32b-instruct:free",
    )

    # 初始化产业链分析智能体
    industry_chain_agent = IndustryChainAgent()

    # 并行分析产业链
    result_draft = asyncio.run(industry_chain_agent.parallel_analyze_chains(draft=draft))

  

In [None]:
import json
import re

# 示例字符串
result = """
```json
{
  "task": "current_core_industry",
  "current_status": "当前核心产业的现状描述",
  "issues": "问题列表",
  "recommendations": "优化建议"
}
"""
json_match = re.search(r"```json\n(.*?)\n```", result, re.DOTALL)
print(json_match)
# 去掉多余的引号和反引号
cleaned_result = result.strip("```json")
print(cleaned_result)

# 将字符串解析为JSON
try:
    data = json.loads(cleaned_result)
    # 读取task字段的内容
    task_content = data.get("task")
    print("task字段的内容是:", task_content)
except json.JSONDecodeError as e:
    print("JSON解析错误:", e)


```json
{
  "task": "current_core_industry",
  "current_status": "当前核心产业的现状描述",
  "issues": "问题列表",
  "recommendations": "优化建议"
}

JSON解析错误: Expecting value: line 2 column 1 (char 1)
