In [None]:
!pip install transformers
!pip install torch


In [None]:
!pip install langchain transformers sentence-transformers Chromadb
!pip install langchain-community
!pip install tqdm

### RAG (加載、分割、嵌入、建立向量庫)

In [None]:
import os
import torch
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.schema import Document
from tqdm import tqdm
from sentence_transformers import SentenceTransformer
from langchain.embeddings import HuggingFaceEmbeddings
from langchain.vectorstores import Chroma

def load_documents(directory):
    documents = []
    supported_formats = ['.txt', '.pdf', '.docx']  # 增加支持的文件格式

    # 遞迴遍歷資料夾
    for root, _, files in os.walk(directory):
        for filename in tqdm(files, desc="Loading documents"):
            file_extension = os.path.splitext(filename)[1].lower()
            if file_extension in supported_formats:
                try:
                    filepath = os.path.join(root, filename)
                    if file_extension == '.txt':
                        with open(filepath, 'r', encoding='utf-8') as file:
                            text = file.read()
                    elif file_extension == '.pdf':
                        # 使用 PyPDF2 或其他 PDF 庫來讀取 PDF
                        # text = read_pdf(filepath)
                        pass
                    elif file_extension == '.docx':
                        # 使用 python-docx 來讀取 DOCX
                        # text = read_docx(filepath)
                        pass
                    filename_without_ext = os.path.splitext(filename)[0]
                    documents.append(Document(page_content=text, metadata={"source": filename_without_ext}))
                except Exception as e:
                    print(f"Error loading {filename}: {e}")
    return documents

def split_documents(documents):
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=256,  # 增加塊大小以包含更多上下文
        chunk_overlap=128,
        length_function=len,
        separators=["\n\n", "\n", " ", ""]  # 自定義分隔符
    )
    chunks = text_splitter.split_documents(documents)
    return chunks

# def create_vectorstore(chunks):
#     device = "cuda" if torch.cuda.is_available() else "cpu"

#     # 使用更先進的嵌入模型
#     embeddings = HuggingFaceEmbeddings(
#         model_name="sentence-transformers/all-mpnet-base-v2",  # 更準確的模型
#         model_kwargs={'device': device}
#     )

#     # 提取文本內容並將其向量化
#     # 假設 chunks 中的每個 Document 對象都有 'page_content' 屬性
#     documents_text = [chunk.page_content for chunk in chunks]
#     vectors = embeddings.embed_documents(documents_text)

#     # 使用 FAISS 構建索引
#     vectorstore = FAISS.from_documents(chunks, embeddings)
#     # 或者使用向量數據創建
#     # vectorstore = FAISS(vectors, chunks)

#     return vectorstore

def create_vectorstore(chunks):
    device = "cuda" if torch.cuda.is_available() else "cpu"

    # 使用更先進的嵌入模型
    embeddings = HuggingFaceEmbeddings(
        model_name="BAAI/bge-m3",  # 更準確的模型sentence-transformers/msmarco-bert-base-dot-v5
        model_kwargs={'device': device}
    )

    # 添加索引參數以提高檢索效率
    vectorstore = Chroma.from_documents(
        chunks,
        embeddings,
        collection_metadata={"hnsw:space": "cosine"}  # 使用餘弦相似度
    )
    return vectorstore

#### 確認切割文本內容

In [None]:
if __name__ == "__main__":
    directory = '/content/drive/MyDrive/暫放'
    # directory = '/content/test'
    documents = load_documents(directory)

    print(f"載入了 {len(documents)} 個文檔")

    chunks = split_documents(documents)

    print(f"文檔被分割成 {len(chunks)} 個chunks")

    # 打印第一個chunk的完整內容
    if chunks:
        first_chunk = chunks[99]
        print("\n第一個Chunk的完整內容:")
        print(f"內容:\n{first_chunk.page_content}")
        print(f"\n長度: {len(first_chunk.page_content)} 字符")
        print(f"來源: {first_chunk.metadata.get('source', '未知')}")

### 初始分割文本及建立向量庫

In [None]:
documents = load_documents("/content/drive/MyDrive/暫放")
chunks = split_documents(documents)
vectorstore = create_vectorstore(chunks)

### Gradio 介面

In [None]:
!pip install langchain_groq
!pip install gradio

In [None]:
import gradio as gr
from langchain.schema.runnable import RunnablePassthrough
from langchain.prompts import PromptTemplate
from langchain_groq import ChatGroq
from langchain.chains import ConversationalRetrievalChain
from langchain.memory import ConversationBufferMemory
from langchain.prompts import ChatPromptTemplate
import os

k = 3
fetch_k = 100


def setup_qa_chain():
    groq_api_key = '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'
    model = 'llama3-8b-8192'
    groq_chat = ChatGroq(groq_api_key=groq_api_key, model_name=model)
    memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
    retriever = vectorstore.as_retriever(search_type="mmr", search_kwargs={"k": k, "fetch_k": fetch_k})
    
    template = """你是一個在幫助使用者回答有關Podcast節目相關內容的智能助手。根據提供的檢索到的資料來回答問題。如果信息不足以回答問題，請直接回答"RAG資料庫沒有您想要的資料"，並須注意以繁體中文回答。

檢索資料信息（包括節目標題）：
{context}

聊天歷史：
{chat_history}

當前問題：{question}

回答指南：
1.需要先將當前問題:{question}做一次prompt engineering
2. 僅使用檢索資料中的信息來回答問題。如果資料不足，請直接回答"RAG資料庫沒有您想要的資料"。
3. 回答務必包含以下元素：
   a) 提及的具體內容要點
   b) 每個內容要點對應的時間戳（格式：MM:SS~MM:SS）
   c) 節目標題（格式：（節目標題：[完整標題]））
4. 回答格式示例：
   "根據檢索資料，[內容摘要1]（時間戳）。此外，[內容摘要2]（時間戳）。[如有更多內容，繼續列舉]。（節目標題：[完整標題]）"
5. 回答要簡潔明瞭，以繁體中文表達。
6. 不要添加任何檢索資料中沒有的信息。
請根據上述指南回答問題：
"""

    document_prompt = PromptTemplate(
        input_variables=["page_content", "source"],
        template="內容: {page_content}\n來源: {source}"
    )
    prompt = ChatPromptTemplate.from_template(template)

    qa_chain = ConversationalRetrievalChain.from_llm(
        llm=groq_chat,
        retriever=retriever,
        memory=memory,
        combine_docs_chain_kwargs={
            "prompt": prompt,
            "document_variable_name": "context",
            "document_prompt": document_prompt
        }
    )
    
    return qa_chain, retriever

qa_chain, retriever = setup_qa_chain()

def get_program_list():
    # 模擬從資料庫獲取節目列表
    programs = [
        "1: 老高與小茉",
        "2: Joe &amp; Jet 未過濾的 with Jason",
        # "節目3: 節目標題3"
    ]
    return "\n".join(programs)

def chat_function(message, history):
    try:
        # 執行檢索
        results = retriever.invoke(message)
        # 根據提供的問題進行檢索和回答
        response = qa_chain.invoke({"question": message, "chat_history": history})
        answer = response['answer']
        
        # 構建來源信息
        sources = "\n可參考下方節目集數：\n"
        for idx, result in enumerate(results):
            filename = os.path.splitext(result.metadata['source'])[0]
            sources += f"Result {idx+1}: {filename}\n"
        
        return answer + "\n\n" + sources
    except Exception as e:
        return f"發生錯誤: {str(e)}\n很抱歉，我無法處理您的問題。請再試一次或換個問題。"

# 創建 Gradio 界面
with gr.Blocks() as iface:
    # 顯示節目清單
    gr.Markdown(f"## 目前資料庫中的節目有：\n{get_program_list()}\n\n請在下方提問：")

    # 創建聊天界面
    chatbot = gr.ChatInterface(
        chat_function,
        title="Podcast Q&A Assistant",
        description="Ask questions about podcast content, and I'll provide answers based on the retrieved information.",
        theme="soft",
        examples=[
            "林書豪這個賽季遇到了什麼困難？",
            "請告訴我這個節目討論了哪些主題？",
            "這集節目中有提到哪些重要的觀點？"
        ],
        retry_btn="重試",
        undo_btn="撤銷",
        clear_btn="清除"
    )

iface.launch()