In [3]:
from langchain_community.chat_models import ChatTongyi #阿里通义前问的聊天模型
from langchain_community.vectorstores import FAISS # FASII向量数据库
from langchain_community.embeddings import DashScopeEmbeddings # 阿里的词嵌入
from langchain_text_splitters import CharacterTextSplitter # 文本切分工具
from langchain_community.document_loaders import TextLoader #读取本地txt文件
from langchain.chains import RetrievalQA
import os

# 加载通义千问API_KEY
YOU_API_KEY = os.environ.get("DASHSCOPE_API_KEY")
api_key = YOU_API_KEY

# 1.初始化通义千问模型
llm = ChatTongyi(model_name = "qwen-turbo",dashscope_api_key = api_key)

# 2.读取数据
file_path = 'kb.txt'
loader = TextLoader(file_path,encoding="utf-8")
docs = loader.load()

# 3.切分知识库文本：每300个字符切一块，块之间重叠20字符
# 这是为了保证每块文本既不太大（方便检索），又有上下文（避免断句）
text_splitter = CharacterTextSplitter(chunk_size = 300,chunk_overlap = 20)
documents = text_splitter.split_documents(docs)

# 4.初始化向量化工具：使用阿里的DashScope API,把文本块转化为向量
embedding = DashScopeEmbeddings(dashscope_api_key = api_key)

# 5. 用FAISS创建一个向量数据库，把所有文本块存进去
# FAISS 负责做"相似度检索"，快速查找与用户提问最接近的内容块
db = FAISS.from_documents(documents,embedding)

# 6. 创建 RAG 问答链
# RetrievalQA 就是 Langchain 内置的 RAG 实现
# 先用FAISS 检索最相关的文本块，然后喂给大模型(qwen-turbo)生成最终回答
qa = RetrievalQA.from_chain_type(
    llm = llm, #大语言模型
    chain_type = "stuff",# 简单的RAG模式，把检索到的文档直接拼接起来
    retriever = db.as_retriever()# 检索器，负责查找相似知识块
)

# 7. 用户提问
query = "公司员工请假的流程是怎么样的？"

# 8. 调用QA链：先检索，再生成
result = qa.invoke({"query":query})

# 9. 打印回答
print(result["result"])

根据公司员工请假管理制度，员工请假的流程如下：

1. 员工需要提前向部门负责人或部门分管领导递交书面申请，并填写《请假单》。
2. 《请假单》需要由职务代理人和主管领导签准后送行政人事部备案。
3. 如果是因急事或身体不适临时请假，员工应在上班前电话告知部门主管，返岗后补办请假手续。
4. 请假期间所属事务及责任由职务代理人承担。
5. 员工休假期满后，须及时到部门负责人或部门分管领导处销假，否则按旷工处理。
6. 请假申请经相关领导审批同意后，请假员工须在休假前持《请假单》及时到行政人事部备案，否则请假不予生效。


In [4]:
# 导入必要的库
from langchain_community.chat_models import ChatTongyi
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import DashScopeEmbeddings
from langchain_text_splitters import CharacterTextSplitter
from langchain_community.document_loaders import TextLoader
from langchain.chains import RetrievalQA
from IPython.display import display, Markdown
import ipywidgets as widgets
from pathlib import Path
import os

YOU_API_KEY = os.environ.get("DASHSCOPE_API_KEY")
api_key = YOU_API_KEY

# 创建交互式组件
print("知识库问答系统 - Jupyter版本")
print("="*50)

# 文件选择部件,widget.Dropdown-创建选择下拉框，可以放置所有文件，选择其中一个进行读取
file_path_widget = widgets.Dropdown(
    options=['kb.txt','ab.txt'],
    value = 'kb.txt',
    placeholder='输入文件路径',
    description='文件路径:',
    disabled=False
)
'''
# widgets.Text-创建文本框，只能编辑文件路径，不可下拉选择文件
file_path_widget = widgets.Text(
    value = 'kb.txt',
    placeholder='输入文件路径', # 文本为空时显示
    description='文件路径:',
    disabled=False
)
'''


encoding_widget = widgets.Dropdown(
    options=['utf-8', 'gbk', 'gb2312', 'ascii'],
    value='utf-8',
    description='编码:',
    disabled=False
)

chunk_size_widget = widgets.IntSlider(
    value=300,
    min=100,
    max=1000,
    step=50,
    description='块大小:',
    disabled=False
)

chunk_overlap_widget = widgets.IntSlider(
    value=20,
    min=0,
    max=100,
    step=5,
    description='重叠:',
    disabled=False
)

# 显示部件
display(file_path_widget)
display(encoding_widget)
display(chunk_size_widget)
display(chunk_overlap_widget)

# 初始化按钮
button = widgets.Button(description="初始化系统")
output = widgets.Output()

display(button, output)

def on_button_clicked(b):
    with output:
        output.clear_output()
        
        file_path = file_path_widget.value
        encoding = encoding_widget.value
        chunk_size = chunk_size_widget.value
        chunk_overlap = chunk_overlap_widget.value
        
        print(f"正在初始化系统...")
        print(f"文件路径: {file_path}")
        print(f"编码: {encoding}")
        print(f"块大小: {chunk_size}")
        print(f"块重叠: {chunk_overlap}")
        print("-" * 30)
        
        # 检查文件是否存在
        if not os.path.exists(file_path):
            print(f"❌ 错误: 文件 '{file_path}' 不存在")
            return
            
        try:
            # 尝试读取文件
            with open(file_path, 'r', encoding=encoding) as f:
                content = f.read()
                
            print("✅ 文件读取成功!")
            print(f"文件大小: {len(content)} 字符")
            print(f"行数: {len(content.splitlines())}")
            
            # 初始化模型
            print("正在初始化模型...")
            llm = ChatTongyi(model_name="qwen-turbo", dashscope_api_key=api_key)
            
            # 加载和分割文档
            print("正在处理文档...")
            loader = TextLoader(file_path, encoding=encoding)
            docs = loader.load()
            
            text_splitter = CharacterTextSplitter(
                chunk_size=chunk_size, 
                chunk_overlap=chunk_overlap
            )
            documents = text_splitter.split_documents(docs)
            
            print(f"文档已分割为 {len(documents)} 个块")
            
            # 创建嵌入和向量存储
            print("正在创建向量数据库...")
            embeddings = DashScopeEmbeddings(model="text-embedding-v1", dashscope_api_key=api_key)
            vectorstore = FAISS.from_documents(documents, embeddings)
            
            # 创建检索链
            qa_chain = RetrievalQA.from_chain_type(
                llm=llm,
                chain_type="stuff",
                retriever=vectorstore.as_retriever()
            )
            
            print("✅ 系统初始化完成!")
            print("-" * 30)
            
            # 保存QA链供后续使用
            globals()['qa_chain'] = qa_chain
            
            # 显示前几个文档块
            print("\n文档块预览:")
            for i, doc in enumerate(documents[:3]):
                print(f"\n--- 块 #{i+1} ---")
                print(doc.page_content[:200] + "..." if len(doc.page_content) > 200 else doc.page_content)
                
            if len(documents) > 3:
                print(f"\n... 还有 {len(documents)-3} 个块未显示")
                
            # 添加问题输入框
            print("\n🎯 现在您可以提问了!")
            question_input = widgets.Text(
                placeholder='输入您的问题',
                description='问题:',
                disabled=False
            )
            
            ask_button = widgets.Button(description="提问")
            answer_output = widgets.Output()
            
            display(question_input, ask_button, answer_output)
            
            def on_ask_clicked(b):
                with answer_output:
                    answer_output.clear_output()
                    question = question_input.value
                    if question:
                        print(f"问题: {question}")
                        print("正在思考...")
                        try:
                            result = qa_chain.run(question)
                            print(f"答案: {result}")
                        except Exception as e:
                            print(f"❌ 出错: {str(e)}")
                    else:
                        print("请输入问题")
            
            ask_button.on_click(on_ask_clicked)
            
        except UnicodeDecodeError:
            print(f"❌ 编码错误: 无法使用 {encoding} 解码文件，请尝试其他编码")
        except Exception as e:
            print(f"❌ 初始化过程中出错: {str(e)}")

button.on_click(on_button_clicked)

知识库问答系统 - Jupyter版本


Dropdown(description='文件路径:', options=('kb.txt', 'ab.txt'), value='kb.txt')

Dropdown(description='编码:', options=('utf-8', 'gbk', 'gb2312', 'ascii'), value='utf-8')

IntSlider(value=300, description='块大小:', max=1000, min=100, step=50)

IntSlider(value=20, description='重叠:', step=5)

Button(description='初始化系统', style=ButtonStyle())

Output()

Output()