In [1]:
import os
from langchain.docstore.document import Document
from langchain.text_splitter import RecursiveCharacterTextSplitter
import docx2txt
from langchain_community.document_loaders import PyPDFLoader
from urllib.request import urlopen, Request
from bs4 import BeautifulSoup

In [2]:
folder = 'data'
flst = [f.lower() for f in os.listdir(folder) if os.path.isfile(os.path.join(folder, f))]
docs = []
for file in flst:
    pth = os.path.join(folder, file)
    surfix = pth.split('.')[-1]
    if surfix in ('docx', 'doc'):
        document = docx2txt.process(pth)
        documents = [Document(page_content=document, metadata={'source': file})]
    elif surfix in ('pdf'):
        loader = PyPDFLoader(pth)
        documents = loader.load()
    elif surfix in ('htm','html'):
        html = open(pth, encoding = 'utf-8').read()
        soup = BeautifulSoup(html, features="html.parser")
        for script in soup(["script", "style"]):
            script.extract()
        documents = [Document(page_content=soup.get_text().replace('\n\n',''), metadata={"source": file})]
    docs += documents
docs = RecursiveCharacterTextSplitter(chunk_size=1000, chunk_overlap=150).split_documents(docs)
docs[:3]

[Document(page_content='Taro Balls, Grass Jelly Shaved Ice, Grass Jelly\n芋圓、仙草刨冰、仙草Taro Balls, Sesame Rice Balls, Taro, Red Beans, Boba, Red Bean Soup\n芋圓、芝麻湯圓、芋頭、紅豆、珍珠、紅豆湯\nMini Taro Balls, Kidney Beans, Boba, Grass Jelly Soup\n小芋圓、花豆、珍珠、燒仙草Taro Paste, Sweet Potato Taro Balls, Rice Balls, Purple Rice Soup\n芋泥球、芋薯圓、湯圓、紫米粥Icy Grass Jelly Signature  \u0cd0Hot Red Bean Soup Signature  Ԍಷ\nHot Grass Jelly Soup Signature  \u0cd0ደ̀ণCold Purple Rice Soup Signature  \u0cd0ഓϷഎ\nHot Almond Soup Signature  \u0cd0Ҿʠደ\nMini Taro Balls, Kidney Beans, Boba, Almond Flakes,\nAlmond Soup  小芋圓、花豆、珍珠、杏仁角、杏仁燒Taro, Sweet Potato Taro Balls, Rice Balls, Purple Rice Soup\n芋頭、芋薯圓、湯圓、紫米粥Hot Purple Rice Soup Signature  \u0cd0ഓϷഎTaro Balls, Taro Paste, Sweet Potato Taro Balls, Boba, Shaved Ice\n芋圓、芋泥球、芋薯圓、珍珠、刨冰Icy Taro Ball Signature  \u0cd0\nTaro Balls, Taro Paste, Ice Cream, Grass Jelly, Grass Jelly Shaved Ice\n芋圓、芋泥球、冰淇淋、仙草、仙草刨冰Double Taro Signature  \u0cd0OUR SIGNATURE SERIES\n*with 1 creamer 附一顆奶油球\n*with 1 c

### Vector Database

In [3]:
from langchain_community.vectorstores import Chroma
# from langchain.embeddings import HuggingFaceEmbeddings
# from langchain_community.embeddings import GPT4AllEmbeddings
from langchain_google_genai import GoogleGenerativeAIEmbeddings

In [4]:
os.environ["GOOGLE_API_KEY"] = "AIzaSyDcd4VDC31LCo09EpmPB7bVLhmMQ5DojoE"

In [5]:
embeddings = GoogleGenerativeAIEmbeddings(model="models/embedding-001")
#embeddings = HuggingFaceEmbeddings(model_name="sentence-transformers/all-MiniLM-L6-v2")
db = Chroma.from_documents(docs, embeddings) #, persist_directory="db")
db._collection

Collection(name=langchain)

In [6]:
db_threshold = 0.35
num_docs = 10

### LLM

In [7]:
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain.chains import RetrievalQA,  ConversationalRetrievalChain

In [8]:
llm = ChatGoogleGenerativeAI(model='gemini-1.5-pro', temperature=0, convert_system_message_to_human=True)

In [9]:
# LLM test
llm.invoke('who is alice?')

AIMessage(content="Alice can refer to many different people and fictional characters.  To help me answer your question, could you please be more specific?  For example, are you asking about:\n\n* **Alice in Wonderland?**  This is likely the most common association, referring to the protagonist of Lewis Carroll's children's books.\n* **A specific person you know?**  If so, I obviously won't have access to personal information about them.\n* **A character from another book, movie, or game?**\n* **A historical figure?**  There have been many women named Alice throughout history.\n* **Alice (artificial intelligence)?**  There might be an AI system with that name.\n")

### RAG

In [10]:
import CBextension, json

In [11]:
# RAG parameters
num_docs = 5 # vdb param
# chain_type = 'stuff'
template_calls = """如果答最后一个Question可以用Context里的function calls回答，就按照Example的格式回复，如果不能就回复空字符串。
Example:
["CBextension.sales_prediction(3)", "CBextension.sales_prediction(6)"]

Context:
{call: CBextension.sales_prediction(n),
description: predict monthly sales figure in n months,
return: a dictionary of {value: str, description: str},
parameters: {n: number of months}}

{QA_history}

Question: {question}
Answer:
"""

template = """你是美国达拉斯一家鲜芋仙店铺的店长助理，请结合以下Context和市场公关知识回答最后一个Question. ‘|analysis retults’里的内容都已经过验证，不用怀疑。
Context:

{context}

{QA_history}

Question: {question}
Answer:
"""

In [12]:
class rag_ext:
    chat_history = [('', '')]
    llm = None
    db = None
    call_prompt_template = None
    prompt_template = ''
    history_length = 0
    doc_num = 0
    def __init__(self, llm, db, prompt_template, **kwargs):
        self.llm = llm
        self.db = db
        self.call_prompt_template = kwargs.pop("prompt_template_call", None)
        self.prompt_template = prompt_template
        self.history_length = kwargs.pop('history_length', 50)
        self.doc_num = kwargs.pop('doc_num', 10)
        self.params = kwargs # kwargs is dictionary
    def query(self, query):
        prompt = self.prompt_template.replace('{question}', query)
        self.chat_history = self.chat_history[-self.history_length:]
        qa_history = '\n\n'.join('Question:' + q + '\nAnswer: ' + a for q, a in self.chat_history)
        prompt = prompt.replace('{QA_history}', qa_history)
        ref_docs = self.db.similarity_search_with_score(query, k = self.doc_num)
        ref_docs.sort(key = lambda x: x[1], reverse=False)
        ref_docs = [{'doc': _[0], 'score': _[1]} for _ in ref_docs \
                    if self.params.get('doc_score_lower', float('-inf')) < _[1] < self.params.get('doc_score_upper', float('inf'))]
        if len(ref_docs) < 1:
            return {'answer': 'No reference is given', 'source_documents': ref_docs, 'generated_question': prompt}
        context = '\n\n'.join(_['doc'].page_content for _ in ref_docs[:10])
        if self.call_prompt_template:
            call_prompt = self.call_prompt_template.replace('{question}', query)
            call_prompt = call_prompt.replace('{QA_history}', qa_history)
            answer = self.llm.invoke(call_prompt)
            if len(answer.content) > 0:
                results = ''
                try:
                    calls = json.loads(answer.content)
                    for call in calls:
                        call = eval(call)
                        results += '|' + call['description'] + ':' + str(call['value'])
                    context += '|analysis retults:' + results
                except:
                    context += '|analysis break:' + answer.content
        prompt = prompt.replace('{context}', context)
        answer = self.llm.invoke(prompt)
        self.chat_history.append((query, answer.content))
        return {'answer': answer.content, 'source_documents': ref_docs[:10], 'generated_question': prompt}

qa = rag_ext(llm=llm, db=db, prompt_template = template
         , doc_num = num_docs, doc_score_upper = db_threshold, history_length = 10, prompt_template_call = template_calls)

### BotUI

In [13]:
import panel as pn
import param

pn.config.loading_spinner = 'petal'
pn.config.loading_color = 'black'
pn.extension()

In [14]:
class conv_rag():
    panels = []
    def convchain(self, query):
        self.panels = self.panels[-15:]
        if not query:
            return pn.WidgetBox(pn.Row('Query:', pn.pane.Markdown("", width=1500,styles={'background-color':'#e6f2ff'})), scroll=True)
        result = qa.query(query)
        db_query = result["generated_question"]
        db_response = [doc['doc'].metadata['source'] + ' ' + str(doc['score']) for doc in result["source_documents"]]
        answer = result['answer'] + '\nReferences:\n' + '\n'.join(doc for doc in db_response) if len(db_response)>0 else "I don't know"
        self.panels.extend([
            pn.Row('Prompt:', pn.pane.Markdown(db_query,width=1300, styles={'background-color': '#cce6ff'})),
            pn.Row('Answer:', pn.pane.Markdown(answer,width=1300, styles={'background-color': '#cce6ff'})),
            pn.Row('Query:', pn.pane.Markdown(query))
        ])
        inp.value = ''  #clears loading indicator when cleared
     
        return pn.WidgetBox(*reversed(self.panels),scroll=True)

cb = conv_rag()

In [15]:
inp = pn.widgets.TextInput(placeholder='Enter text here…',width=500)
conversation = pn.bind(cb.convchain, inp) 
tab1 = pn.Column(
    pn.Row(inp),
    pn.layout.Divider(),
    pn.panel(conversation,loading_indicator=True),
    pn.layout.Divider()
)

dashboard = pn.Column(
    pn.Row(pn.pane.Markdown('#AIBot')),
    pn.Tabs( ('Conversation',tab1))
)

In [16]:
server = pn.serve(dashboard,title="AI Assistant", port = 8002)

Launching server at http://localhost:8002


AssertionError: 