In [16]:
# 准备模型
from llama_index.core import Settings
from llama_index.llms.ollama import Ollama
from llama_index.embeddings.ollama import OllamaEmbedding
from llama_index.llms.openai_like import OpenAILike
from llama_index.llms.openai import OpenAI
from dotenv import  load_dotenv
import os

load_dotenv()

# Settings.llm = Ollama(
#     model="llama3.1:latest",
#     # model="qwen2.5:latest",
#     request_timeout=120.0
# )

# Deepseek coder
Settings.llm = OpenAILike(
    api_base=os.environ["deepseek_api_base"],
    api_key=os.environ["deepseek_api_key"],
    model="deepseek-coder"
)

# GPTUS
# Settings.llm = OpenAI(
#     api_base="https://www.gptapi.us",
#     api_key="sk-uAMMNTlQQozks47C6fB1C10cDaAc4659B44c96157e6829B8",
#     model="gpt-4o"
# )

Settings.embed_model = OllamaEmbedding(model_name="mxbai-embed-large" )

In [2]:
# 解析HQL到执行计划并且更新到metadata中

from llama_index.core.schema import Document
from pyspark.sql import SparkSession
import os

os.environ["HADOOP_HOME"] = "C:\\Users\\chen_\\.env\\hadoop-3.4.0"

spark = SparkSession.builder.appName("HQL").getOrCreate()

def updateMetaForHQL(doc: Document):
    file_path = doc.metadata["file_path"]
    plan = spark._jsparkSession.sessionState().sqlParser().parsePlan(doc.text)
    doc.text = plan.toJSON()
    doc.metadata["category"] = "Spark SQL logical plan"

    

In [3]:
# 构建解析后的documents
from llama_index.core import SimpleDirectoryReader
from enum import Enum
import os

class FileType(Enum):
    HQL = ".hql"

documents = SimpleDirectoryReader("data").load_data()
for doc in documents:
    file_name: str = doc.metadata["file_name"]
    _, file_extension = os.path.splitext(file_name)
    if file_extension == FileType.HQL.value:
        updateMetaForHQL(doc)

In [4]:
documents[0]

Document(id_='3d9c9b16-5f5d-4eb2-9b22-dbf38d09a08a', embedding=None, metadata={'file_path': 'c:\\Users\\chen_\\Workspace\\genai\\llama_index_test\\data\\test.hql', 'file_name': 'test.hql', 'file_size': 418, 'creation_date': '2024-10-10', 'last_modified_date': '2024-10-10', 'category': 'Spark SQL logical plan'}, excluded_embed_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], excluded_llm_metadata_keys=['file_name', 'file_type', 'file_size', 'creation_date', 'last_modified_date', 'last_accessed_date'], relationships={}, text='[{"class":"org.apache.spark.sql.catalyst.plans.logical.Sort","num-children":1,"order":[[{"class":"org.apache.spark.sql.catalyst.expressions.SortOrder","num-children":1,"child":0,"direction":{"object":"org.apache.spark.sql.catalyst.expressions.Descending$"},"nullOrdering":{"object":"org.apache.spark.sql.catalyst.expressions.NullsLast$"},"sameOrderExpressions":[]},{"class":"org.apache.spark.sql.catalys

In [5]:
# 构建索引
from llama_index.core import VectorStoreIndex

index = VectorStoreIndex.from_documents(documents, show_progress=True)
index.storage_context.persist(persist_dir="./index")

Parsing nodes: 100%|██████████| 1/1 [00:00<00:00, 248.37it/s]
Generating embeddings: 100%|██████████| 3/3 [00:00<00:00,  8.30it/s]


In [6]:
# 从持久化后的索引中恢复
from llama_index.core import load_index_from_storage, StorageContext

storage_context = StorageContext.from_defaults(persist_dir="./index")
index = load_index_from_storage(storage_context)

In [17]:
from llama_index.core.llms import ChatMessage, MessageRole
from llama_index.core import ChatPromptTemplate

chat_text_qa_msgs = [
    (
        "system",
        """
        You are a helpful assistant that answers questions about the SQL language.
        When you answer the question, please don't use technological terms, like 'logical plan', 'sql' etc.
        You should answer in a natural language way, it's better in business words.
        """
    )
]
text_qa_template = ChatPromptTemplate.from_messages(chat_text_qa_msgs)

refine_msgs = [
    ("system", "If you can't make sure the answer, just say 'Sorry, I don't know'")
]
refine_template = ChatPromptTemplate.from_messages(refine_msgs)

engine = index.as_chat_engine(
    system_prompt=text_qa_template,
    # refine_template=refine_template
)

while True:
    text_input = input("User: ")
    if text_input == "exit":
        break
    resp = engine.chat(text_input)
    print(f"Agent: {resp}")

Agent: Hello! How can I assist you today? If you have any questions about the provided Spark SQL logical plan or need help understanding its components, feel free to ask!


KeyboardInterrupt: 