<a href="https://colab.research.google.com/github/changedi/DPpro/blob/master/aiops_ragbuild.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install -r requirements.txt

In [None]:
import asyncio

from llama_index.core import Settings, QueryBundle
from llama_index.core.base.llms.types import ChatMessage, MessageRole
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.legacy.llms import DashScope, DashScopeGenerationModels
from qdrant_client import models
from tqdm.asyncio import tqdm
import os
import time

In [None]:
embeding = HuggingFaceEmbedding(
        model_name="BAAI/bge-large-zh-v1.5",
        cache_folder="./",
        embed_batch_size=128,
    )
Settings.embed_model = embeding

In [None]:
from llama_index.core import SimpleDirectoryReader
from llama_index.core.embeddings import BaseEmbedding
from llama_index.core.extractors import SummaryExtractor
from llama_index.core.ingestion import IngestionPipeline
from llama_index.core.llms.llm import LLM
from llama_index.core.vector_stores.types import BasePydanticVectorStore
from llama_index.core.node_parser import SentenceSplitter
from llama_index.core.schema import Document, MetadataMode
from llama_index.vector_stores.qdrant import QdrantVectorStore
from qdrant_client import AsyncQdrantClient, models
from qdrant_client.http.exceptions import UnexpectedResponse

In [None]:
from typing import Sequence
from llama_index.core.extractors.interface import BaseExtractor
from llama_index.core.schema import BaseNode
import re

class CustomFilePathExtractor(BaseExtractor):
    last_path_length: int = 4

    def __init__(self, last_path_length: int = 4, **kwargs):
        super().__init__(last_path_length=last_path_length, **kwargs)

    @classmethod
    def class_name(cls) -> str:
        return "CustomFilePathExtractor"

    async def aextract(self, nodes: Sequence[BaseNode]) -> list[dict]:
        metadata_list = []
        for node in nodes:
            node.metadata["file_path"] = "/".join(
                node.metadata["file_path"].split("/")[-self.last_path_length :]
            )
            if node.metadata["file_path"] is not None:
              rematch = re.search(r'data/(umac|director|rcp|emsplus)/',node.metadata["file_path"])
              if rematch is not None:
                file_source = rematch.group(1)
                if file_source is not None:
                    node.metadata["file_source_dir"] = file_source
            metadata_list.append(node.metadata)
        return metadata_list


class CustomTitleExtractor(BaseExtractor):
    def __init__(self, *args, **kwargs):
        super().__init__(**kwargs)

    @classmethod
    def class_name(cls) -> str:
        return "CustomTitleExtractor"

    # 将Document的第一行作为标题
    async def aextract(self, nodes: Sequence[BaseNode]) -> list[dict]:
        try:
            document_title = nodes[0].text.split("\n")[0]
            last_file_path = nodes[0].metadata["file_path"]
        except:
            document_title = ""
            last_file_path = ""
        metadata_list = []
        for node in nodes:
            if node.metadata["file_path"] != last_file_path:
                document_title = node.text.split("\n")[0]
                last_file_path = node.metadata["file_path"]
            node.metadata["document_title"] = document_title
            metadata_list.append(node.metadata)

        return metadata_list


In [None]:
def read_data(path: str = "data") -> list[Document]:
    reader = SimpleDirectoryReader(
        input_dir=path,
        recursive=True,
        required_exts=[
            ".txt",
        ],
    )
    return reader.load_data()

def build_pipeline(
    llm: LLM,
    embed_model: BaseEmbedding,
    template: str = None,
    vector_store: BasePydanticVectorStore = None,
) -> IngestionPipeline:
    transformation = [
        SentenceSplitter(chunk_size=1024, chunk_overlap=50),
        CustomTitleExtractor(metadata_mode=MetadataMode.EMBED),
        CustomFilePathExtractor(last_path_length=4, metadata_mode=MetadataMode.EMBED),
        # SummaryExtractor(
        #     llm=llm,
        #     metadata_mode=MetadataMode.EMBED,
        #     prompt_template=template or SUMMARY_EXTRACT_TEMPLATE,
        # ),
        embed_model,
    ]

    return IngestionPipeline(transformations=transformation, vector_store=vector_store)

async def build_vector_store(
    config: dict, reindex: bool = False
) -> tuple[AsyncQdrantClient, QdrantVectorStore]:
    client = AsyncQdrantClient(
        # url=config["QDRANT_URL"],
        # location=":memory:",
        path="./qdrant"
    )
    if reindex:
        try:
            await client.delete_collection(config["COLLECTION_NAME"] or "aiops24")
            print("Reindex Start...")
        except UnexpectedResponse as e:
            print(f"Collection not found: {e}")

    try:
        if not await client.collection_exists(config["COLLECTION_NAME"]):
            print("Collection Creating...")
            await client.create_collection(
                collection_name=config["COLLECTION_NAME"] or "aiops24",
                on_disk_payload=True,
                vectors_config=models.VectorParams(
                    size=config["VECTOR_SIZE"] or 1024, distance=models.Distance.DOT
                ),
            )
            print("Collection Created!")
    except UnexpectedResponse:
        print("Collection already exists")
    return client, QdrantVectorStore(
        aclient=client,
        collection_name=config["COLLECTION_NAME"] or "aiops24",
        parallel=4,
        batch_size=32,
    )

In [None]:
config = {"COLLECTION_NAME":"aiops24", "VECTOR_SIZE":1024}
client, vector_store = await build_vector_store(config, reindex=True)

collection_info = await client.get_collection(
  config["COLLECTION_NAME"] or "aiops24"
)

In [None]:
from typing import Iterable
import jsonlines


def read_jsonl(path):
    content = []
    with jsonlines.open(path, "r") as json_file:
        for obj in json_file.iter(type=dict, skip_invalid=True):
            content.append(obj)
    return content


def save_answers(
    queries: Iterable, results: Iterable, path: str = "data/answers.jsonl"
):
    answers = []
    for query, result in zip(queries, results):
        answers.append(
            {"id": query["id"], "query": query["query"], "answer": result}
        )

    # use jsonlines to save the answers
    def write_jsonl(path, content):
        with jsonlines.open(path, "w") as json_file:
            json_file.write_all(content)

    # 保存答案到 data/answers.jsonl
    write_jsonl(path, answers)


In [None]:
!git clone https://www.modelscope.cn/datasets/issaccv/aiops2024-challenge-dataset.git /dataset

In [None]:
!unzip /dataset/data.zip -d ./

In [None]:
from dotenv import dotenv_values, load_dotenv
load_dotenv()
os.environ["DASHSCOPE_API_KEY"] = os.getenv("DASHSCOPE_API_KEY")
llm = DashScope(model_name=DashScopeGenerationModels.QWEN_TURBO)

In [None]:
ff = re.search(r'data/(umac|director|rcp|emsplus)/',"abcdafds/fdsfsdfs/fdsfdsf/data/umac/fdsafew").group(1)
print(ff)

umac


In [None]:
data = read_data("data")

In [None]:
if collection_info.points_count == 0:
        t_start = time.time()
        print(f"read data finished in {time.time()-t_start}s and total {len(data)} records.")
        pipeline = build_pipeline(llm, embeding, vector_store=vector_store)
        # 暂时停止实时索引
        await client.update_collection(
            collection_name=config["COLLECTION_NAME"] or "aiops24",
            optimizer_config=models.OptimizersConfigDiff(indexing_threshold=0),
        )
        t_start = time.time()
        await pipeline.arun(documents=data, show_progress=True, num_workers=1)
        # 恢复实时索引
        await client.update_collection(
            collection_name=config["COLLECTION_NAME"] or "aiops24",
            optimizer_config=models.OptimizersConfigDiff(indexing_threshold=20000),
        )
        print(f"build vector store finished. in {time.time()-t_start}s, total:{collection_info.points_count}")

In [None]:
!wget "https://www.modelscope.cn/api/v1/models/qwen/Qwen2-7B-Instruct-GGUF/repo?Revision=master&FilePath=qwen2-7b-instruct-q8_0.gguf" -O qwen-2-7b-instruct.gguf

In [None]:
!curl -fsSL https://ollama.com/install.sh | sh

In [None]:
!OLLAMA_FLASH_ATTENTION=1 nohup /usr/local/bin/ollama serve &

In [None]:
!echo "FROM ./qwen-2-7b-instruct.gguf" > Model_file
!/usr/local/bin/ollama create qwen_local -f Model_file


In [None]:
!/usr/local/bin/ollama ps

In [None]:
!nohup /usr/local/bin/ollama run qwen_local &

In [None]:
!curl http://localhost:11434/api/generate -d '{"model": "qwen_local","prompt":"Why is the sky blue?"}'

In [None]:
from llama_index.llms.ollama import Ollama
llm = Ollama(
        model="qwen2", base_url="http://localhost:11434", temperature=0, request_timeout=120
    )

res = await llm.acomplete("你好，你是谁")
print(res)

In [2]:
from typing import List
import qdrant_client

from llama_index.core.llms.llm import LLM
from llama_index.vector_stores.qdrant import QdrantVectorStore
from llama_index.core.postprocessor.types import BaseNodePostprocessor
from llama_index.core.vector_stores import VectorStoreQuery, MetadataFilter, FilterOperator, MetadataFilters
from llama_index.core import (
    QueryBundle,
    PromptTemplate,
    StorageContext,
    VectorStoreIndex,
)
from llama_index.core.embeddings import BaseEmbedding
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.schema import NodeWithScore
from llama_index.core.base.llms.types import CompletionResponse, ChatMessage, MessageRole

from qdrant_client.http.models import Filter, FieldCondition, MatchValue, MatchAny


QA_TEMPLATE = """\
    上下文信息如下：
    ----------
    {context_str}
    ----------
    请你基于上下文信息而不是自己的知识，回答以下问题，可以分点作答，如果上下文信息没有相关知识，你可以尝试自己回答，不要复述上下文信息，不要输出你不知道的知识.
    请注意区分问题类型，如果问题是问为什么，请解答原因；如果问题是问如何做，请列出方法和步骤；如果问题是问是什么，请直接回答是什么.
    问题：\
    {query_str}

    回答：\
    """

class QdrantRetriever(BaseRetriever):
    def __init__(
        self,
        vector_store: QdrantVectorStore,
        embed_model: BaseEmbedding,
        similarity_top_k: int = 2,
    ) -> None:
        self._vector_store = vector_store
        self._embed_model = embed_model
        self._similarity_top_k = similarity_top_k
        super().__init__()

    async def _aretrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        query_embedding = self._embed_model.get_query_embedding(query_bundle.query_str)
        vector_store_query = VectorStoreQuery(
            query_embedding, similarity_top_k=self._similarity_top_k
        )
        filters = Filter(
            should=[
                Filter(
                    must=[
                        FieldCondition(
                            key="file_source_dir",
                            match=MatchValue(value=query_bundle.custom_embedding_strs[0]),
                        )
                    ]
                )
            ]
        )
        query_result = await self._vector_store.aquery(vector_store_query, qdrant_filters=filters)

        node_with_scores = []
        for node, similarity in zip(query_result.nodes, query_result.similarities):
            if len(node_with_scores) == 0:
                node_with_scores.append(NodeWithScore(node=node, score=similarity))
            elif similarity >= 0.3:
                node_with_scores.append(NodeWithScore(node=node, score=similarity))
        return node_with_scores


async def rag_with_knowledge_retrieval_querybundle(
    query_bundle: QueryBundle,
    retriever: BaseRetriever,
    llm: LLM,
    qa_template: str = QA_TEMPLATE,
    reranker: BaseNodePostprocessor | None = None,
    debug: bool = False,
    detail: bool = False,
    generate: bool = True,
    progress=None,
) -> str:
    node_with_scores = await retriever.aretrieve(query_bundle)
    if debug:
        score_str = ",".join(
            [f"{node.metadata['document_title']}: {node.score}" for node in node_with_scores]
        )
        print(f"retrieved question: {query_bundle.query_str} \nnodes num:{len(node_with_scores)} \nscores:{score_str}\n------")
    if detail and debug:
        print(f"retrieved {query_bundle.query_str}:\n{node_with_scores}\n------")
    if reranker:
        node_with_scores = reranker.postprocess_nodes(node_with_scores, query_bundle)
        if debug:
            print(f"reranked:\n{node_with_scores}\n------")
    context_str = "\n\n".join(
        [f"{node.metadata['document_title']}: {node.text}" for node in node_with_scores]
    )
    fmt_qa_prompt = PromptTemplate(qa_template).format(
        context_str=context_str, query_str=query_bundle.query_str
    )
    ret = None
    if generate:
        user_msg = ChatMessage.from_str(fmt_qa_prompt, MessageRole.USER)
        sys_msg = ChatMessage.from_str(
            "你是一个云计算和网络领域的运维工程师，请尽量识别领域关键词和关键信息，阅读给定的知识回答问题", MessageRole.SYSTEM)
        chat_response = await llm.achat([sys_msg, user_msg])
        ret = chat_response.message.content
        if debug:
            print(f"generated :\n{ret}\n")
    if progress:
        progress.update(1)
    return ret



ModuleNotFoundError: No module named 'qdrant_client'

In [None]:
retriever = QdrantRetriever(vector_store, embeding, similarity_top_k=5)

# query_bundle = QueryBundle(query_str="""
#         SGSN/MME网元的采集方式有哪几种？
#     """, custom_embedding_strs=["umac"])
# result = await rag_with_knowledge_retrieval_querybundle(
#     query_bundle, retriever, llm, debug=True, detail=True
# )
# print(result)

# 检查检索结果
# queries = read_jsonl("question.jsonl")
# for query in tqdm(queries, total=len(queries)):
#     query_bundle = QueryBundle(query_str=query["query"], custom_embedding_strs=[query["document"]])
#     await rag_with_knowledge_retrieval_querybundle(
#         query_bundle, retriever, llm, debug=True, generate=False
#     )

# 生成答案
print("Start generating answers...")
queries = read_jsonl("question.jsonl")
results = []
for query in tqdm(queries, total=len(queries)):
    # if query["id"] > 2:
    #     break
    query_bundle = QueryBundle(query_str=query["query"], custom_embedding_strs=[query["document"]])
    result = await rag_with_knowledge_retrieval_querybundle(
        query_bundle, retriever, llm, debug=True
    )
    results.append(result)

# 处理结果
save_answers(queries, results, "result.jsonl")