In [2]:
from langchain_community.document_loaders.json_loader import JSONLoader
from langchain.text_splitter import CharacterTextSplitter
from langchain_chroma import Chroma
from langchain_community.embeddings import HuggingFaceBgeEmbeddings
from langchain.schema import Document
import json

model_name = "./bge-large-zh-v1.5"  # 中文embedding
model_kwargs = {'device': 'cpu'}  # 指定cpu模式运行

hf = HuggingFaceBgeEmbeddings(
    encode_kwargs={'normalize_embeddings': True},
    model_name=model_name,
    model_kwargs=model_kwargs,
)

# 初始化单一的 Chroma 向量数据库
vector_store = Chroma(
    collection_name="combined_database",  # 可以使用一个数据库来管理多个集合
    embedding_function=hf,
    persist_directory="./chroma_combined_db"  # 统一的持久化目录
)

# 定义函数加载 JSON 文件
def load_json(path):
    with open(path, 'r', encoding='utf-8') as f:
        data = json.load(f)
    return data

def add_docs():
    # 加载生产流程数据并添加到数据库
    production_flow_data = load_json("json/product_production_flow.json")
    for product_data in production_flow_data:
        # 创建 Document 对象
        doc = Document(page_content=str(product_data).replace("'", ""), metadata={"product_name": product_data['product'], "type": "production_database"})

        # 添加到向量数据库
        vector_store.add_documents([doc])

        
    # 加载 BOM 数据并添加到数据库
    bom_data = load_json("json/product_bom.json")
    for entry in bom_data:
        # 创建 Document 对象
        doc = Document(page_content=str(entry).replace("'", ""), metadata={"product_name": entry['product'], "type": "bom_database"})

        # 添加到向量数据库
        vector_store.add_documents([doc])

        
    # 加载库存数据
    inventory_data = load_json("json/inventory_data.json")

    # 遍历库存数据并添加到数据库
    for item in inventory_data["workshop_inventory"]:
        # 创建 Document 对象
        doc = Document(page_content=str(item).replace("'", ""), metadata={"material_name": item['material_name'], "type": "inventory_database"})

        # 添加到向量数据库
        vector_store.add_documents([doc])

    # 加载常见问题数据
    q_and_a_data = load_json("json/QA.json")

    # 遍历常见问题数据并添加到数据库
    for product_data in q_and_a_data:
        if isinstance(product_data, dict):
            # 格式化每个问题和解决方案
            for issue_data in product_data["common_issues"]:
                issue = issue_data["issue"]
                product_name = product_data['product']

                # 创建 Document 对象
                doc = Document(page_content=str(issue_data).replace("'", ""), metadata={"product_name": product_data['product'], "issue": issue, "type": "Q_and_A"})

                # 添加到向量数据库
                vector_store.add_documents([doc])

  from tqdm.autonotebook import tqdm, trange


In [3]:
import erniebot

erniebot.api_type = "aistudio"
erniebot.access_token = "2c1755c0d9320df6ae89f46f79186e745eac21ae"  # 替换为你的实际 access_token

add_docs()

In [12]:
# 检索函数
def retrieve_data(query, c_type):
    # 进行相似性搜索，并使用 filter 过滤指定的 collection_name
    # 手动检索并验证
    retriever = vector_store.as_retriever(search_kwargs={"k": 1, "filter": {"type": c_type}})  # rank 1
    # query = "我要生产五十个智能照明控制器开发板的BOM"
    # Get relevant documents ordered by relevance score
    docs = retriever.invoke(query)
    return docs


# 处理用户请求
def select_vector_db(user_input):
    # Step 1: 检索生产流程
    production_flow = retrieve_data(user_input, "production_database")

    # Step 2: 检索BOM信息
    bom_info = retrieve_data(user_input, "bom_database")

    # Step 3: 检索库存信息
    inventory_info = retrieve_data(f"{bom_info}库存情况", "inventory_database")

    # Step 4: 整合数据
    result_json = {
        "production_flow": production_flow[0].page_content,
        "bom_info": bom_info[0].page_content,
        "inventory_info": inventory_info[0].page_content
    }

    return result_json


# 测试
user_request = "我想要生成可燃气体检测仪开发板80个"
result = select_vector_db(user_request)
print(result)

{'production_flow': '{product: 可燃气体检测仪开发板, production_steps: [{step_name: 原材料准备, description: 获取生产所需的所有原材料和元件，包括PCB板、电阻、电容、IC等。, required_materials: [PCB板, 电阻, 电容, IC], equipment: [自动化仓库机器人]}, {step_name: SMT贴片, description: 将元件贴装到PCB板上，确保贴装位置和方向正确。, required_materials: [PCB板, 元件], equipment: [SMT设备]}, {step_name: 自动光学检测（AOI）, description: 检测已贴装的PCB板，确保元件位置和焊接质量符合要求。, required_materials: [贴装好的PCB板], equipment: [AOI检测设备]}, {step_name: 人工插件, description: 根据设计要求，手动插入特定元件，并进行焊接。, required_materials: [插件元件, 焊锡], equipment: [手工焊接台]}, {step_name: 包装, description: 将合格的板卡进行包装，贴上标签，并准备出货。, required_materials: [包装材料, 标签], equipment: [包装机]}]}', 'bom_info': '{product: 可燃气体检测仪开发板, BOM: [{material_name: PCB板, quantity_per_unit: 1, unit: 块}, {material_name: 电阻, quantity_per_unit: 10, unit: 个}, {material_name: 电容, quantity_per_unit: 8, unit: 个}, {material_name: IC, quantity_per_unit: 4, unit: 个}, {material_name: LED指示灯, quantity_per_unit: 2, unit: 个}, {material_name: 焊锡, quantity_per_unit: 0.2, unit: 克

In [10]:
@ensure_questions
def generate_response(user_input):
    # stream=True 表示我们希望以流的方式接收回复
    response = erniebot.ChatCompletion.create(
        model='ernie-3.5',
        messages=[{'role': 'user', 'content': user_input}],
        stream=True,
        top_p=0.95,
        temperature=0.9
    )
    result_text = ""
    # 处理流式输出
    for chunk in response:
        # 对于每个chunk，获取结果并打印
        result = chunk.get_result()
        result_text += result
        print(result, end='', flush=True)

    return result_text

        
def ensure_questions(generate_response):
    def wrapper(user_input, *args, **kwargs):
            rag_json = select_vector_db(user_input)
            analysis_prompt = f"""
            您是{role}，请帮我生成80个可燃气体检测仪开发板, 以下从车间JSON文档中检索出来的信息：{rag_json}

            请严格根据所提供的信息回答，回答内容需要包括原材料等信息，不清楚的内容请明确说明为'不知道'。
            """

            # 生成 response_text
            res = generate_response(analysis_prompt, *args, **kwargs)
            return res
        
    return wrapper


role = "电子仪器制造车间业务助理"

# 主函数
def main():
    # 打印欢迎消息
    print(f"{role}: 欢迎使用{role}")
    
    while True:
        user_input = input("You: ")

        if user_input.lower() in ["exit", "quit", "stop"]:
            print("Conversation ended.")
            break

        # 生成并打印响应
        response_text = generate_response(user_input)
        if response_text is not None:
            print(f"{role}: " + response_text)

# 运行主函数
if __name__ == "__main__":
    main()