### 第一模块：负责处理图片上传、处理和保存结果等操作。

In [1]:
# pip install gTTS

In [2]:
import requests
import json
import time
import pandas as pd
import os
from datetime import datetime
import mimetypes
import gradio as gr
import asyncio
import time
global bom_text
global reqdoc_text

# API URL
API_URL = "https://genshinimpact.site/v1/workflows/run"
UPLOAD_URL = "https://genshinimpact.site/v1/files/upload"

# 默认提示文本
DEFAULT_PROMPT = "请输入您的需求"

def upload_image(file_path, user_id, api_key):
    """上传图片文件并返回文件 ID"""
    try:
        mime_type, _ = mimetypes.guess_type(file_path)
        if not mime_type:
            mime_type = "application/octet-stream"
        with open(file_path, "rb") as f:
            files = {"file": (os.path.basename(file_path), f, mime_type)}
            data = {"user": user_id}
            headers = {"Authorization": f"Bearer {api_key}"}
            response = requests.post(UPLOAD_URL, headers=headers, files=files, data=data)
            response.raise_for_status()
            return response.json().get("id")
    except Exception as e:
        print(f"\n❌ 图片上传失败: {str(e)}")
        return None

def process_stream(response, bom_key, reqdoc_key, log_callback=None):
    """流式处理响应数据，同时通过回调打印进度日志"""
    outputs = {bom_key: "", reqdoc_key: ""}
    if log_callback:
        log_callback("\n🔄 处理进度：")
    else:
        print("\n🔄 处理进度：")
    try:
        for raw_line in response.iter_lines():
            if not raw_line or raw_line == b': ping\n':
                continue
            try:
                decoded_line = raw_line.decode('utf-8').strip()
                if not decoded_line.startswith('data:'):
                    continue
                json_str = decoded_line[5:].strip()
                if not json_str:
                    continue
                event_data = json.loads(json_str)
                if event_data.get("event") == "node_started":
                    msg = f"▷ 正在处理节点：{event_data['data'].get('title', '未知节点')}"
                    if log_callback:
                        log_callback(msg)
                    else:
                        print(msg)
                elif event_data.get("event") == "node_finished":
                    node_status = event_data['data'].get('status')
                    status_icon = "✓" if node_status == "succeeded" else "✗"
                    msg = f"{status_icon} 节点状态：{node_status} | 耗时：{event_data['data'].get('elapsed_time', 0):.1f}s"
                    if log_callback:
                        log_callback(msg)
                    else:
                        print(msg)
                if event_data.get("event") == "workflow_finished":
                    outputs = event_data.get("data", {}).get("outputs", {})
                    outputs = {k: v or "" for k, v in outputs.items()}
            except json.JSONDecodeError as e:
                err_msg = f"⚠ JSON 解析失败 | 原始数据：{json_str[:100]}... | 错误：{str(e)}"
                if log_callback:
                    log_callback(err_msg)
                else:
                    print(err_msg)
            except KeyError as e:
                err_msg = f"⚠ 数据字段缺失：{str(e)}"
                if log_callback:
                    log_callback(err_msg)
                else:
                    print(err_msg)
    except requests.exceptions.ChunkedEncodingError as e:
        err_msg = f"⚠ 网络连接异常中断：{str(e)}"
        if log_callback:
            log_callback(err_msg)
        else:
            print(err_msg)
    except Exception as e:
        err_msg = f"⚠ 未处理的异常：{str(e)}"
        if log_callback:
            log_callback(err_msg)
        else:
            print(err_msg)
    return {
        bom_key: outputs.get(bom_key, ""),
        reqdoc_key: outputs.get(reqdoc_key, "")
    }

def save_markdown(content, bom_key, reqdoc_key):
    """保存结果到 Markdown 文件"""
    timestamp = time.strftime("%Y%m%d-%H%M%S")
    filename = f"output_{timestamp}.md"
    md_content = f"""# 处理结果

## {bom_key.capitalize()}

{content[bom_key] or "无内容"}

---

## {reqdoc_key.capitalize()} 

{content[reqdoc_key] or "无内容"}
"""
    with open(filename, "w", encoding="utf-8") as f:
        f.write(md_content)
    print(f"\n✅ 结果已保存至：{os.path.abspath(filename)}")

async def process_image(image, text_input):
    """
    异步生成器函数，处理图片及文本输入，实时 yield 进度日志给前端显示。
    """
    log_text = ""

    def add_log(message):
        nonlocal log_text
        log_text += message + "\n"
        print(message)

    if not text_input:
        text_input = DEFAULT_PROMPT

    api_key = 'app-sYYYxIHMgLR68pRegzfg4Zel'
    user_id = 'WPP_JKW'
    bom_key = 'BOM文件'
    reqdoc_key = '需求文档'

    headers = {
        "Authorization": f"Bearer {api_key}",
        "Accept": "text/event-stream",
        "Content-Type": "application/json"
    }
    payload = {
        "inputs": {},
        "response_mode": "streaming",
        "user": user_id
    }

    if image is not None:
        temp_file_path = "temp_image.png"
        image.save(temp_file_path, format="PNG")

        add_log("⏫ 正在上传图片...")
        yield log_text

        image_id = upload_image(temp_file_path, user_id, api_key)
        if image_id is None:
            add_log("❌ 图片上传失败")
            yield log_text
            if os.path.exists(temp_file_path):
                os.remove(temp_file_path)
            return

        payload["inputs"]["image"] = {
            "transfer_method": "local_file",
            "upload_file_id": image_id,
            "type": "image"
        }
        if os.path.exists(temp_file_path):
            os.remove(temp_file_path)

    if text_input:
        payload["inputs"]["text_in"] = text_input

    try:
        add_log("\n🚀 开始处理请求...")
        yield log_text

        await asyncio.sleep(1)
        yield log_text

        with requests.post(API_URL, headers=headers, json=payload, stream=True, timeout=120) as resp:
            resp.raise_for_status()
            result = process_stream(resp, bom_key, reqdoc_key, add_log)

        await asyncio.sleep(1)
        yield log_text
        save_markdown(result, bom_key, reqdoc_key)
        add_log("\n✅ 处理完成，结果已保存至 Markdown 文件。")
        add_log(f"{bom_key}: {result[bom_key]}")
        add_log(f"{reqdoc_key}: {result[reqdoc_key]}")
        global bom_text, reqdoc_text
        bom_text = result[bom_key]
        reqdoc_text = result[reqdoc_key]
        yield log_text
        return
    except requests.exceptions.RequestException as e:
        add_log(f"\n❌ 请求失败: {str(e)}")
        yield log_text
        return
    except Exception as e:
        add_log(f"\n⚠ 发生意外错误: {str(e)}")
        yield log_text
        return

  from .autonotebook import tqdm as notebook_tqdm


### 第二模块: 负责数据库搜索、分析电器元件和计算总价等操作。

In [3]:
import pandas as pd
import os
from datetime import datetime
import io

# 全局变量，用于存储 BOM 数据
bom_data = None


def search_database(selected_website):
    global bom_data
    # 根据选择的网站生成数据
    text_result = f"选择的网站：{selected_website}"
    if bom_data is None:
        device_details = [{"器件名称": "无", "单价": 0, "数量": 0, "总价": 0}]
    else:
        device_details = []
        for _, row in bom_data.iterrows():
            price = row['price'] if 'price' in row else 0
            # 如果选择了华秋商城，价格打 9 折
            if selected_website == "华秋商城":
                price = price * 0.9
            device_details.append({
                "器件名称": row['元器件型号'],
                "单价": price,
                "数量": row['数量'],
                "总价": price * row['数量']
            })

    df = pd.DataFrame(device_details)
    return text_result, df


def get_latest_output_file():
    # 获取当前目录下的所有文件
    files = os.listdir('.')
    # 筛选出以 output_ 开头，以 .md 结尾的文件
    output_files = [f for f in files if f.startswith('output_') and f.endswith('.md')]
    if not output_files:
        return None
    # 从文件名中提取时间戳
    timestamps = []
    for file in output_files:
        try:
            timestamp_str = file.split('_')[1].split('.')[0]
            timestamp = datetime.strptime(timestamp_str, '%Y%m%d-%H%M%S')
            timestamps.append((timestamp, file))
        except ValueError:
            continue
    # 按时间戳排序，取最新的文件
    timestamps.sort(reverse=True)
    return timestamps[0][1]


def extract_csv_from_md(file_content):
    start_index = file_content.find('```csv')
    end_index = file_content.find('```', start_index + len('```csv'))
    if start_index == -1 or end_index == -1:
        return None
    csv_content = file_content[start_index + len('```csv'):end_index].strip()
    return csv_content


def analyze_components():
    global bom_data
    latest_file = get_latest_output_file()
    if latest_file is None:
        text_result = "未找到符合条件的文件"
        device_details = [{"器件名称": "无", "单价": 0, "数量": 0, "总价": 0}]
        df = pd.DataFrame(device_details)
        return text_result, df
    try:
        with open(latest_file, 'r', encoding='utf-8') as file:
            file_content = file.read()
        csv_content = extract_csv_from_md(file_content)
        if csv_content is None:
            text_result = f"未在文件 {latest_file} 中找到 CSV 数据"
            device_details = [{"器件名称": "无", "单价": 0, "数量": 0, "总价": 0}]
            df = pd.DataFrame(device_details)
            return text_result, df
        bom_data = pd.read_csv(io.StringIO(csv_content))
        # 这里可以根据实际情况添加 'price' 列，暂时初始化为 0
        if 'price' not in bom_data.columns:
            bom_data['price'] = 0
        bom_data['总价'] = bom_data['price'] * bom_data['数量']
        text_result = f"已读取 BOM 文件: {latest_file}"
        device_details = []
        for _, row in bom_data.iterrows():
            device_details.append({
                "器件名称": row['元器件型号'],
                "单价": row['price'],
                "数量": row['数量'],
                "总价": row['price'] * row['数量']
            })
        df = pd.DataFrame(device_details)
        return text_result, df
    except FileNotFoundError:
        text_result = f"未找到文件: {latest_file}"
        device_details = [{"器件名称": "无", "单价": 0, "数量": 0, "总价": 0}]
        df = pd.DataFrame(device_details)
        return text_result, df


def calculate_total_price(df):
    df['总价'] = df['单价'] * df['数量']
    total = df['总价'].sum()
    return df, total


### 第三模块 ： 负责生成部署指南

In [4]:
#负责生成部署指南。
from openai import OpenAI

def generate_deployment_guide(requirement_doc, bom_data):
    client = OpenAI(
        base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
        api_key="sk-dd280b67097548a8ab1b1ccd9b767569"
    )
    prompt = f"""【部署指南生成提示】
基于以下需求文档：
{requirement_doc}

以及BOM数据：
{bom_data}

请生成500字左右的详细部署指南，说明部署步骤、环境要求及注意事项，并尽量优化部署方案和提示细节。"""
    completion = client.chat.completions.create(
        model="deepseek-r1",
        messages=[{"role": "user", "content": prompt}],
        temperature=0.3,
        top_p=0.7,
        max_tokens=4096,
        stream=True
    )

    guide_text = ""
    for chunk in completion:
        if chunk.choices[0].delta.content is not None: 
            guide_text += chunk.choices[0].delta.content
    return guide_text

# 文字转语音函数
def text_to_speech(text):
    try:
        lang_code = 'zh-CN'  # 假设使用中文语音
        tts = gTTS(text=text, lang=lang_code)
        timestamp = str(int(time.time()))
        filename = f"output_{timestamp}.mp3"
        tts.save(filename)

        # 删除旧的临时文件
        for old_file in glob.glob("output_*.mp3"):
            if old_file != filename:
                os.remove(old_file)

        return filename
    except Exception as e:
        print(f"发生错误: {e}")
        return None

def generate_and_read_deployment_guide():
    deployment_guide = generate_deployment_guide(requirement_doc=reqdoc_text,bom_data=bom_text)
    audio_file = text_to_speech(deployment_guide)
    return deployment_guide, audio_file

### 模块四 生成代码

In [10]:
import requests
import json
import matplotlib.pyplot as plt
def generate_code(requirement_document, bom_csv, user_id="WPP_JKW", conversation_id=None):
    """
    调用Dify Agent生成代码（支持CSV格式BOM输入）
    
    参数：
    api_key - 你的API密钥
    requirement_document - 需求文档内容（字符串）
    bom_csv - BOM列表（CSV格式字符串）
    user_id - 用户标识（默认"user_123"）
    conversation_id - 会话ID（可选）
    
    返回：
    生成的代码内容
    """
    api_key="app-EvqSJJJldKdMwVtzja9C4Gc2"
    headers = {
        "Authorization": f"Bearer {api_key}",   
        "Content-Type": "application/json"
    }

    payload = {
        "inputs": {
            "requirement_document": requirement_document,
            "bom_list": bom_csv  # 直接传递CSV字符串
        },
        "query": "请根据需求文档和BOM列表生成相应的代码",
        "response_mode": "streaming",
        "user": user_id,
        "conversation_id": conversation_id or ""
    }

    response = requests.post(
        "https://genshinimpact.site/v1/chat-messages",
        headers=headers,
        json=payload,
        stream=True
    )

    full_response = ""
    try:
        for line in response.iter_lines():
            if line:
                decoded_line = line.decode('utf-8')
                if decoded_line.startswith('data:'):
                    json_str = decoded_line.split('data: ')[1]
                    event_data = json.loads(json_str)

                    event_type = event_data.get('event')
                    
                    if event_type in ['message', 'agent_message']:
                        full_response += event_data.get('answer', '')
                    
                    elif event_type == 'message_end':
                        break
                    
                    elif event_type == 'error':
                        raise Exception(f"API错误: {event_data.get('message')}")

    except Exception as e:
        raise RuntimeError(f"处理流式响应时出错: {str(e)}")
    fig, ax = plt.subplots(figsize=(4,3))
    ax.plot([0, 1], [0, 0], 'k-', lw=2)  # 电源到电阻
    ax.plot([1, 2], [0, 0], 'k-', lw=2)  # 电阻到 LED
    ax.plot([2, 2], [0, 1], 'k-', lw=2)  # LED 垂直连线
    ax.text(0.5, -0.1, '电源', ha='center')
    ax.text(1.5, -0.1, '电阻', ha='center')
    ax.text(2.2, 0.5, 'LED', ha='left')
    ax.axis('off')
    fig.tight_layout()

    return full_response, fig
def code_gen():
    code, fig = generate_code(requirement_document=reqdoc_text, bom_csv=bom_text)
    return code

### 模块五 生成原理图代码(可用于LCEDA)

In [11]:
import asyncio
import requests
import json
import time
API_URL = "https://genshinimpact.site/v1/workflows/run"
async def generate_schematic():
    """
    异步生成器函数，调用API生成原理图代码。
    输入：全局变量 reqdoc_text 和 bom_text，会封装为键 requirement 和 bom。
    输出：单个字符串变量"原理图"，即生成的原理图代码。
    """
    global reqdoc_text, bom_text
    log_text = ""

    def add_log(message):
        nonlocal log_text
        log_text += message + "\n"
        print(message)

    # API和用户信息（api_key 可在此修改为适合生成原理图的密钥）
    api_key = "app-deBVvv9f3gQAPWMrS0IZDNTS"  # 请根据需要修改此处的密钥
    user_id = "WPP_JKW"
    headers = {
        "Authorization": f"Bearer {api_key}",
        "Accept": "text/event-stream",
        "Content-Type": "application/json"
    }
    
    payload = {
        "inputs": {
            "requirement": reqdoc_text,
            "bom": bom_text
        },
        "response_mode": "streaming",
        "user": user_id
    }
    schematic_result = ""
    try:
        add_log("🚀 开始生成原理图代码...")
        await asyncio.sleep(1)  # 模拟等待
        # 使用与模块1相同的 API_URL
        with requests.post(API_URL, headers=headers, json=payload, stream=True, timeout=120) as resp:
            resp.raise_for_status()
            for raw_line in resp.iter_lines():
                if not raw_line or raw_line == b': ping\n':
                    continue
                try:
                    decoded_line = raw_line.decode("utf-8").strip()
                    if not decoded_line.startswith("data:"):
                        continue
                    json_str = decoded_line[5:].strip()
                    if not json_str:
                        continue
                    event_data = json.loads(json_str)
                    # 处理不同类型的事件
                    if event_data.get("event") == "node_started":
                        msg = f"▷ 正在处理节点：{event_data['data'].get('title', '未知节点')}"
                        add_log(msg)
                    elif event_data.get("event") == "node_finished":
                        node_status = event_data['data'].get('status')
                        status_icon = "✓" if node_status == "succeeded" else "✗"
                        msg = f"{status_icon} 节点状态：{node_status} | 耗时：{event_data['data'].get('elapsed_time', 0):.1f}s"
                        add_log(msg)
                    elif event_data.get("event") == "workflow_finished":
                        schematic_result = event_data.get("data", {}).get("outputs", {})
                        schematic_result = {k: v or "" for k, v in schematic_result.items()}
                except json.JSONDecodeError as e:
                    add_log(f"⚠ JSON 解析失败 | 原始数据：{json_str[:100]}... | 错误：{str(e)}")
                except KeyError as e:
                    add_log(f"⚠ 数据字段缺失：{str(e)}")
        add_log("✅ 原理图代码生成完成.")
        return schematic_result.get("picpic", "")
    except requests.exceptions.RequestException as e:
        add_log(f"❌ 请求失败: {str(e)}")
        return ""
    except Exception as e:
        add_log(f"⚠ 发生意外错误: {str(e)}")
        return ""

### 最后： 整体调用

In [12]:


css = """
    .gradio-container {
        background-color: rgb(229,246,255,0.5);
        font-family:'Microsoft YaHei'; 
        padding: 20px;
    }

    #background_image {
        width: 100%; /* 保持图片宽度不变 */
        height: ; /* 图片高度为视口高度的80% */
        border-radius: 30px; /* 增加圆角边框 */
        margin-top: -3vh; /* 向上压缩的视口高度 */
        margin-left: auto;
        margin-right: auto; /* 居中显示 */
        display: block; /* 确保图片占据整个容器 */
    }
"""

with gr.Blocks(theme=gr.themes.Soft(), css=css, title=" 电路预测和生成" ) as demo:
    
    gr.Image(
        value="./src/background.jpg", 
        show_label=False, 
        container=False,
        show_download_button=False,
        show_share_button=False,
        show_fullscreen_button=False,
        elem_id="background_image"
    )

    # 第一部分：图片上传、文本输入、开始按钮与结果显示
    with gr.Row():
        gr.Markdown("""
                <div style="  border-radius: 10px; text-align: center;">
                    <h2 style="color:#000;">Step 1. 用户上传图片 </h2>
                </div>
                """)
    
    with gr.Row():
        # 指定了列的缩放比例和最小宽度，
        with gr.Column(scale=1, min_width=300, elem_classes="align-column"): 

            # 添加类名，
            image_input = gr.Image(label="上传图片", type="pil", elem_id="image-input") 
            # 给Textbox添加了elem_classes="textbox-input"类名，
            text_input = gr.Textbox(label="输入文本", placeholder="这是什么，用中文回答", elem_classes="textbox-input") 

            #设置了variant="primary"样式（主样式）\类名，
            start_button = gr.Button("开始", variant="primary", elem_classes="start-button") 
            
        # 指定了列的缩放比例和最小宽度，
        with gr.Column(scale=1, min_width=300, elem_classes="align-column"): 
            
            result_text = gr.Textbox(label="图片分析结果", lines=15, elem_classes="result-text", show_label=True, max_lines=20) 

    gr.Markdown("<hr>")



    # 第二部分：搜索全球数据库
    with gr.Row():
        gr.Markdown("""
            <div style=" padding: 20px; border-radius: 10px; text-align: center;">
                <h2 style="color:#000;">Step 2. 搜索全球数据库 </h2>
            </div>
        """)
    with gr.Row():
        gr.Markdown("点击分析按钮，查看相关电器元件，点击分析，然后点击计算总价")
    analyze_button = gr.Button("分析电器元件", variant='primary',  elem_classes="analyze-button") 
    with gr.Row():
        search_result = gr.Textbox(label="分析结果")
    with gr.Row():
        website_choices = ["立创商城", "华秋商城", "德捷电子", "云汉芯城", "PCBWay"]
        selected_website = gr.Radio(choices=website_choices, label="选择网站", value="立创商城")
    with gr.Row():
        device_details_output = gr.Dataframe(
            headers=["器件名称", "单价", "数量", "总价"],
            label="器件详情",
            interactive=True
        )
    with gr.Row():
        calculate_button = gr.Button("计算总价", variant='secondary')
        total_price_display = gr.Number(label="总价", value=0)

    gr.Markdown("<hr>")

    # 第三部分：电路代码生成
    with gr.Row():
        gr.Markdown("""
                <div style=" padding: 20px; border-radius: 10px; text-align: center;">
                    <h2 style="color:#000;">Step 3. 生成电路代码 </h2>
                </div>
        """)
    with gr.Row():
        code_output = gr.Code(label="生成的电路代码", language="python")
            
        
    generate_code_button = gr.Button("生成电路", variant="primary",)
    gr.Markdown("<hr>")

    # 第四部分：部署指南
    with gr.Row():
        gr.Markdown("""
                <div style=" padding: 20px; border-radius: 10px; text-align: center;">
                    <h2 style="color:#000;">Step 4. 生成部署指南 </h2>
                </div>
        """)

    with gr.Column():
        # scale=1 尽可能占据列宽度
        #line=文本框在界面上初始显示的行数。它决定了文本框在页面上的高度，用户第一眼看到的文本框可以容纳多少行文本
        deployment_text = gr.Textbox(scale=1 ,label="部署指南 / 说明", lines=10, placeholder="点击按钮后，将会生成项目部署指南，生成中ing...")
        generate_deployment_button = gr.Button("生成部署指南", variant='primary')
    
    # 新增语音播放组件
    audio_output = gr.Audio(label="朗读部署指南", type="filepath", )
    generate_deployment_button = gr.Button("朗读部署指南", variant='primary')



    
    #第五部分 生成原理图代码
    with gr.Row():
        gr.Markdown("""
                <div style=" padding: 20px; border-radius: 10px; text-align: center;">
                    <h2 style="color:#000;">Step 5. 生成原理图代码 </h2>
                </div>
        """)
    with gr.Row():
        schematic_code_output = gr.Code(label="生成的原理图代码", language="python")
    with gr.Row():
        generate_schematic_button = gr.Button("生成原理图代码", variant='primary')
    
        
    # 网页上的按钮事件绑定
    # 注意：删除了 stream=True 参数
    start_button.click(
        fn=process_image,
        inputs=[image_input, text_input],
        outputs=result_text,
        show_progress=True
    )

    analyze_button.click(
        fn=analyze_components,
        inputs=None,
        outputs=[search_result, device_details_output]
    )

    selected_website.change(
        fn=search_database,
        inputs=[selected_website],
        outputs=[search_result, device_details_output]
    )

    calculate_button.click(
        fn=calculate_total_price,
        inputs=[device_details_output],
        outputs=[device_details_output, total_price_display]
    )

    generate_code_button.click(
        fn=code_gen,
        inputs=None,
        outputs=code_output
    )

    generate_deployment_button.click(
        fn=generate_and_read_deployment_guide,
        inputs=None,
        outputs=[deployment_text, audio_output]
    )
    # 绑定生成原理图代码按钮
    generate_schematic_button.click(
        fn=generate_schematic,
        inputs=None,
        outputs=schematic_code_output
    )
    # 启用队列，支持异步生成器函数的流式更新
    demo.queue()
 
demo.launch(share=True)


* Running on local URL:  http://127.0.0.1:7863
* Running on public URL: https://5b49533fb3a767d6a7.gradio.live

This share link expires in 72 hours. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




  fig.tight_layout()
  fig.tight_layout()
  fig.tight_layout()
