In [2]:
%load_ext autoreload
%autoreload 2

In [6]:
import sys
import os
sys.path.insert(0, os.path.abspath('..'))
from llmagent.secret import AK_SK
keys = AK_SK('../llmagent/secret/keystore/qianfan.keys')
os.environ["QIANFAN_ACCESS_KEY"] = keys.get_ak()
os.environ["QIANFAN_SECRET_KEY"] = keys.get_sk()

In [7]:
from llmagent.llmapi import QianfanLLM, QianfanEmbedding, model_spec

llm = QianfanLLM(model_spec=model_spec.Speed128K, temperature=0.5)



In [1]:
from atlassian import Confluence
from langchain import hub
from langchain_chroma import Chroma
from langchain_core.output_parsers.transform import BaseTransformOutputParser
from langchain_core.output_parsers import StrOutputParser
from langchain_core.runnables import RunnablePassthrough
from langchain_text_splitters import RecursiveCharacterTextSplitter
from langchain_community.document_loaders import ConfluenceLoader
from langchain.schema import BaseOutputParser


In [8]:
vectorstore = Chroma("qianfan", embedding_function=QianfanEmbedding(), persist_directory="../vectorstore/fintopia.vec")

In [11]:
vectorstore._collection.count()

505

## 第一次初始化时运行

In [8]:
def get_children_pages_recursively(client, page_id: str):
    child_pages = client.get_page_child_by_type(page_id)
    for page in child_pages:
        yield page
        if "id" in page:
            yield from get_children_pages_recursively(client, page["id"])

from getpass import getpass
wiki_psw = getpass("wiki:")
# get all pages under some page
confluence = Confluence(
    url='https://wiki.fintopia.tech/',
    username='tongxinwen@fintopia.tech',
    password=wiki_psw)
child_pages = [p for p in get_children_pages_recursively(confluence, "74290517")]

loader = ConfluenceLoader(
    url="https://wiki.fintopia.tech/",
    page_ids=[p["id"] for p in child_pages],
    username="tongxinwen@fintopia.tech",
    api_key=wiki_psw,
    limit=10,
)
documents = loader.load()

text_splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=100)
splits = text_splitter.split_documents(documents)

batch_size = 16
doc_ids = []
for i in range(0, len(splits), batch_size):
    print(f"Adding documents {i} to {i+batch_size}")
    texts = [doc.page_content for doc in splits[i:i+batch_size]]
    metadatas = [doc.metadata for doc in splits[i:i+batch_size]]
    doc_ids += vectorstore.add_texts(texts, metadatas)

Adding documents 0 to 16
Adding documents 16 to 32
Adding documents 32 to 48
Adding documents 48 to 64
Adding documents 64 to 80
Adding documents 80 to 96
Adding documents 96 to 112
Adding documents 112 to 128
Adding documents 128 to 144
Adding documents 144 to 160
Adding documents 160 to 176
Adding documents 176 to 192
Adding documents 192 to 208
Adding documents 208 to 224
Adding documents 224 to 240
Adding documents 240 to 256
Adding documents 256 to 272
Adding documents 272 to 288
Adding documents 288 to 304
Adding documents 304 to 320
Adding documents 320 to 336
Adding documents 336 to 352
Adding documents 352 to 368
Adding documents 368 to 384
Adding documents 384 to 400
Adding documents 400 to 416
Adding documents 416 to 432
Adding documents 432 to 448
Adding documents 448 to 464
Adding documents 464 to 480
Adding documents 480 to 496
Adding documents 496 to 512


In [21]:
# vectorstore.search("数据库", search_type="similarity")

[Document(metadata={'id': '95160416', 'source': 'https://wiki.fintopia.tech/display/riskDev/2024H2', 'title': '2024H2', 'when': '2024-07-09T17:39:33.000+08:00'}, page_content='200ms KR3 模型推理框架国内外统一 【P2】 KR4 模型cache表优化，提高插入和查询效率，同时降低存储压力【P2】 建立HBase + Hive + PolarDB 3位一体的多级缓存 查询方面通过PolarDB + HBase提供高效查询，存储方面通过Hive提供长期存储功能，方面回溯各种问题 建立各级存储模块的过期策略，在PolarDB中存储原来的30%的数据，HBase存储未过期model的所有记录，hive存储最近3年的所有记录 O5 海外平台搭建 【P0】 以菲律宾、印尼为主 目标，训练全部迁移到ailab，推理迁移到新框架。'),
 Document(metadata={'id': '95160416', 'source': 'https://wiki.fintopia.tech/display/riskDev/2024H2', 'title': '2024H2', 'when': '2024-07-09T17:39:33.000+08:00'}, page_content='200ms KR3 模型推理框架国内外统一 【P2】 KR4 模型cache表优化，提高插入和查询效率，同时降低存储压力【P2】 建立HBase + Hive + PolarDB 3位一体的多级缓存 查询方面通过PolarDB + HBase提供高效查询，存储方面通过Hive提供长期存储功能，方面回溯各种问题 建立各级存储模块的过期策略，在PolarDB中存储原来的30%的数据，HBase存储未过期model的所有记录，hive存储最近3年的所有记录 O5 海外平台搭建 【P0】 以菲律宾、印尼为主 目标，训练全部迁移到ailab，推理迁移到新框架。'),
 Document(metadata={'id': '91856753', 'source': 'https://wiki.fintopia.tech/pages/vi

## 构建Rag chain

In [41]:
from langchain_core.prompts import ChatPromptTemplate
# 感觉需要精简一下问题模版。现在llm中会自动记录之前的问答历史，但是如果增加了context，那么每次都需要api处理多个相同的context，感觉非常浪费
prompt = ChatPromptTemplate.from_messages(
        [
            ("human", "你是一个解答用户问题的assistant，可以根据从语料库中召回的context文本回答问题。请尽量保证回答的内容都可以在context中找到根据，并务必保留 source 后面的url链接。\n以下是context文本：{context}，\n问题是：{question}"),
            ("assistant", "根据context文本，我认为答案是："),
        ]
    )

In [61]:
vectorstore.search("数据库", search_type="similarity", k=6)
# vectorstore.search?

[Document(metadata={'id': '91856753', 'source': 'https://wiki.fintopia.tech/pages/viewpage.action?pageId=91856753', 'title': '7. 在AiLab中连接hive', 'when': '2024-09-03T14:48:03.000+08:00'}, page_content='因为权限管理和LDAP一样，所以用户如果想要申请某个表的权限，需要去数仓申请 https://dw-lighthouse.fintopia.tech/tools/hive-permission 另外，如果不想把自己的密码保存在ipynb中，可以使用getpass函数，如下，这样就保存在passwd这个变量里了 ！！注意 ！！ hive.read_sql 这句话会把整个表下载到本地。强烈建议先在SQL中对数据进行过滤，采样或者group。 比如如果想知道某个表的行数，我们应该这样做： py 而不是这样：否则很可能把AiLab打挂 py 向hive中导入表 py ",'),
 Document(metadata={'id': '97182399', 'source': 'https://wiki.fintopia.tech/pages/viewpage.action?pageId=97182399', 'title': '模型迁移ModelServe', 'when': '2024-09-09T19:29:40.000+08:00'}, page_content='优缺点:直接读库开发简单,但是需要频繁扫model_result大表(百亿级),可能会有性能问题. 方案一:直接通过hivesql,通过hive直接对比增量数据 优缺点:通过hive的资源去做对比避免影响线上mysql性能,但是需要注意大批量的对比sql可能会超时. 5.一键切换模型服务到新 通过mirror服务核对后,确认模型迁移后无diff,需要提供一键转换功能,将模型的部署方式转换为ModelServe部署,主要更新MODEL_DATA中的deployType为3.然后清楚掉模型的缓存,让新的预测重新从数据库加载进行'),
 Document(metadata={'id': '97182399', 'sour

In [95]:
from langchain_core.retrievers import BaseRetriever
from langchain.vectorstores import VectorStore
from langchain.llms.base import LLM
from pydantic import Field

class VectorStoreRetrieverMoreHistory(BaseRetriever):
    vs: VectorStore
    llm: LLM
    topk: int = Field(gt=0, default=5, description="Number of documents to retrieve")

    def _get_relevant_documents(
        self, chat_str: str, *, run_manager):
        ai_ans = [c['content'] for c in self.llm.conversation_history if c['role'] == 'assistant']
        user_quest = self.llm.user_question_history
        qa_pairs = []
        for q, a in zip(user_quest, ai_ans):
            qa_pairs += [q, a]
            
        if chat_str.startswith(".. "):
        # append the last 2 conversation 
            query = "\n".join(qa_pairs[-2:] + [chat_str[3:]])
            self.llm.add_user_question(chat_str[3:])
        elif chat_str.startswith(".... "):
            # append all conversation history
            query = "\n".join(qa_pairs + [chat_str[5:]])
            self.llm.add_user_question(chat_str[5:])
        else:
            self.llm.clear_history()
            query = chat_str
            self.llm.add_user_question(query)
        return self.vs.similarity_search(query, k=self.topk)

retriever = VectorStoreRetrieverMoreHistory(vs=vectorstore, llm=llm, topk=8)


In [113]:
def format_docs(docs):
    return "\n\n".join(f"{d.page_content}\nsource: {d.metadata['source']}" for d in docs) 

llm.clear_history()
    
rag_chain = (
    {"context": retriever | format_docs, "question": RunnablePassthrough()}
    | prompt
    | llm
    | StrOutputParser()
)

In [114]:
rag_chain.invoke("公共资源池和私有资源池的区别")



'公共资源池和私有资源池的主要区别在于资源获取方式和计费方式。\n\n公共资源池是阿里云的后付费资源池，用户可以根据需求申请使用，按照实际使用的小时数进行计费，不用则不计费。创建和启动实例的过程相对较慢。\n\n私有资源池则是用户预付费购买的资源池，创建和启动实例的速度较快。用户在使用时需要先付费购买资源，然后才能使用。相比于公共资源池，私有资源池的资源更加稳定，不会因资源紧张而导致任务提交失败。但是，如果多人同时使用私有资源池，可能会导致资源不足，需要排队等待或者进入初始化状态。因此，在选择资源池时需要根据实际需求进行选择。以上信息来源于context文本中的描述。'

In [115]:
rag_chain.invoke("如何进行分布式LightGBM模型训练")



'分布式LightGBM模型训练可以通过以下步骤进行：\n\n1. 数据分片：将数据集分成多个分片文件，每个分片文件包含数据的一部分。分片的数量需要是并行处理节点的整数倍。\n2. 分布式计算框架的搭建：利用Ray等分布式计算框架，构建多机的分布式计算环境。\n3. 提交训练任务：通过Ray框架提交LightGBM的训练任务到集群上，开启分布式训练。\n4. 数据加载和模型训练：平台会自动将训练集和测试集的数据平均分配到各个worker节点上，然后利用LightGBM官方提供的训练API进行模型训练。在分布式训练过程中，每个机器都会在自己本地数据集上进行训练，并通过某种投票算法选出最优的模型更新到最终的模型里。\n\n请注意，以上步骤需要根据具体情况进行调整和优化，以确保分布式训练的效果和效率。另外，分布式训练可能会带来一些精度损失，需要根据实际需求进行权衡和选择。如有更多疑问或需求，建议参考LightGBM官方文档或咨询相关领域的专家。'

In [116]:
rag_chain.invoke(".. 需要具备哪些前提条件")



'在进行分布式训练时，需要具备以下前提条件：\n\n1. 数据集必须是分片好的，且分片个数需要是并行处理节点的整数倍。\n2. 数据集必须是parquet格式。\n3. 训练集和测试集的数据需要自动分配到各个处理节点上，以减少load data带来的开销和单节点的内存压力。\n4. 需要准备训练参数，这些参数通过yaml文件定义。\n5. 在使用某些服务或功能时，可能需要用户的label列、train和valid数据集，且这些数据集必须放在同一个父目录下。\n6. 对于使用AiLab的api提交的情况，训练样本的parquet格式最好不要超过60G，否则可能会出现内存不足无法训练的情况。\n\n以上是根据context文本提供的信息整理的需要具备的前提条件，如果还有其他细节要求或特定环境的要求，请进一步确认。'

In [117]:
rag_chain.invoke(".. 参数建议如何设置")



'参数建议的设置需要根据具体情况而定，对于可选参数，可以使用默认值；对于必选参数，则需要按照要求填写具体的参数值。\n\n在执行容器时，框架会把参数注入到命令行中，用户需要保证脚本可以接受这样的参数。对于参数的具体设置，可以参考相关官方文档或教程，如Papermill的官网教程。此外，根据实验结果和模型表现，可以对参数进行调整和优化，例如通过HPO自动寻参的方式，自动搜索最优参数。\n\n对于超参数调优，可以使用成熟的Search算法（如贝叶斯优化、GridSearch等）选择最优值。在设置参数时，需要根据具体的模型和任务来确定哪些参数需要调整，以及调整的范围和步长等。\n\n以上信息仅供参考，具体的参数设置建议还需要根据实际需求和实践经验进行调整。'

In [118]:
llm.conversation_history

[{'role': 'user',
  'content': 'Human: 你是一个解答用户问题的assistant，可以根据从语料库中召回的context文本回答问题。请尽量保证回答的内容都可以在context中找到根据，并务必保留 source 后面的url链接。以下是context文本：当资源不够的时候会进入排队，实例会一直处于初始化状态。 解释一下这二者的区别， 固定资源池是我们自己买的一个小池子，这部分是预付费的，即钱已经花出去了，不用就浪费了。特点是：创建和启动实例快，但是如果用的人多了可能没资源。 公共资源池是后付费的，根据小时数计费，不用不计费，属于阿里云的大池子。特点是创建和启动实例比较慢。 建议大家先尝试创建or启动内存型资源池。没资源会提示。如果提示了，再去申请公共资源池 （2）选择弹性资源池时，配置与现状相同 注意，使用固定资源池时，镜像会优先选择在本地缓存的版本。所以第一次拉取镜像的时间会比较长，后面就快很多了。\nsource: https://wiki.fintopia.tech/pages/viewpage.action?pageId=80187151\n\n）当用户提交任务时，我们会优先选择私有资源池的资源，如果私有资源池没有资源了，我们才选择公有资源池的CPU机器。之所以公有资源池不选择GPU机器，是因为GPU库存经常紧张，很容易导致任务提交失败。 PS，另外，我们也调研了一下单机多卡的模式，相比于上述的单卡多机它的优势是减少了网络IO的开销，只在本机内部多个GPU之间交换数据。而GPU之间往往有1GB+，甚至100GB的专有带宽。而且，相比与分布式训练带来的AUC损耗，它对AUC的影响应该也会更少些。但是通过2周多的调研，我们发现LightGBM社区多多卡模式的支持非常差。因此放弃了。 其他问题\nsource: https://wiki.fintopia.tech/pages/viewpage.action?pageId=106844570\n\n这样的文件中。 固定资源池资源申请 1、新增Lab时增加资源池类型选项，单选框 可选“内存型资源池” “计算型资源池” 等 非“公共资源池” 的类型。 （1）选择固定资源池时，提示剩余资源情况 注意：因为阿里云当前的一些策略，这里显示的配额数并不准确。申请时尽量不要把配额中剩余的资

In [119]:
rag_chain.invoke("ailab实例一直黑屏")



'ailab实例一直黑屏大部分情况下，用户打开AILab应该可以直接进入Notebook页面。不过偶尔会卡在上述页面，此时点击一下右上角的关闭按钮即可。有的时候是因为登录过期了。这个字显示的非常不明显。需要自己看看。'

In [110]:
ai_ans = [c['content'] for c in llm.conversation_history if c['role'] == 'assistant']
user_quest = llm.user_question_history
qa_pairs = []
for q, a in zip(user_quest, ai_ans):
    qa_pairs += [q, a]
qa_pairs

['如何进行分布式LightGBM模型训练',
 '进行分布式LightGBM模型训练，可以基于Ray框架来实现。Ray是一款支持LightGBM等多种模型框架的分布式训练机器学习框架。它利用LightGBM自身的分布式训练功能，为LightGBM提供了一套多机的分布式计算框架。在分布式训练时，Ray会调用LightGBM官方提供的训练API进行机器组网，并开启分布式训练。此外，为了提高GPU利用率和训练效率，可以在分布式模式下采用特定的训练算法，如voting。但需要注意，分布式训练可能会导致一定的精度损失。更多细节和原理可以参考LightGBM的官方文档：https://lightgbm.readthedocs.io/en/latest/Features.html#optimization-in-distributed-learning 。\nsource: https://wiki.fintopia.tech/pages/viewpage.action?pageId=97186155、https://wiki.fintopia.tech/pages/viewpage.action?pageId=106844570',
 '需要具备哪些前提条件',
 '为了使用自研的分布式LightGBM打分模版，需要具备以下两个前提条件：1是数据集必须是分片好的，而且分片个数等于并行pod数的整数倍；2是数据集必须是parquet格式。详情请参考：https://wiki.fintopia.tech/pages/viewpage.action?pageId=105312091',
 '参数建议如何设置',
 '参数设置需要根据具体的模型训练任务和数据集特性来决定。在LightGBM的分布式训练中，可以考虑调整诸如"bagging_freq", "max_bin", "min_data_in_bin", "min_data_in_leaf", "min_sum_hessian_in_leaf", "min_child_weight", "boosting_type", "objective", "metric"等参数。具体的参数值需要通过实验和调优来确定，可以使用Optuna等调参工具来帮助选择较优的参数组合。并没有固定的参数设置建议，因为每个数据集和任务都是独特的。\nsou