### Imports & Installation

In [12]:
# !pip install langchain_google_genai
# !pip install langchain_community
# !pip install langchain_huggingface
# !pip install gradio
# !pip install rapidfuzz
# !pip install pypdf
# !pip install faiss-cpu
# !pip install spacy
# !pip install fuzzywuzzy
# !python -m spacy download en_core_web_sm
# !pip install python-Levenshtein
# !pip install django
# !pip install django-cors-headers
# !pip install djangorestframework
# !pip install drf_spectacular
# !pip install psycopg2
# !python.exe -m pip install --upgrade pip

In [17]:
import json, os, re, ast
import subprocess
from langchain_core.output_parsers import StrOutputParser
from langchain.prompts import ChatPromptTemplate
from langchain_google_genai import ChatGoogleGenerativeAI
from langchain_core.runnables import RunnablePassthrough
from langchain.document_loaders import PyPDFLoader
from langchain_huggingface import HuggingFaceEmbeddings
from langchain.vectorstores import FAISS
from langchain.schema import Document
import gradio as gr
from sentence_transformers import SentenceTransformer, util
import spacy
from rapidfuzz import fuzz
pdf_file = "./res/Cli script guidebook.pdf"


### Load Model

In [18]:
def get_llm(model_name="gemini-1.5-pro"):
    return ChatGoogleGenerativeAI(
        model=model_name,
        temperature=0.1,
        max_output_tokens=500,
        google_api_key=os.environ.get("GOOGLE_API_KEY")
    )

#### Load API KEY from environment

In [19]:
api_key_path = './credentials/google_credentials.json'
if not os.path.exists(api_key_path):
    raise FileNotFoundError(f"API key file not found at {api_key_path}. Please provide the correct path.")

with open(api_key_path, 'r') as f:
    api_key = json.load(f)
os.environ['GOOGLE_API_KEY'] = api_key.get("GOOGLE_API_KEY")

### Load & Prepare Data

In [20]:
loader = PyPDFLoader(pdf_file)
pdf_docs = loader.load_and_split()

# Load mapping
with open("res/mapping.json", "r") as f:
    raw_mapping = json.load(f)

with open("res/data_mapping.json", "r") as f:
    data_mapping = json.load(f)

# Format mapping as Document
mapping_text = "\n".join(f"{k} -> {v}" for k, v in raw_mapping.items())
mapping_doc = Document(page_content=mapping_text)

all_docs = pdf_docs + [mapping_doc]

### Create Vector Space

In [21]:
embeddings = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
vectorstore = FAISS.from_documents(documents=all_docs, embedding=embeddings)
retriever = vectorstore.as_retriever()

### RAG Chain

In [22]:
def format_docs(docs):
    return "\n\n".join(doc.page_content for doc in docs)

def combine_inputs(input_dict):
    question = input_dict["question"]
    retrieved_docs = retriever.get_relevant_documents(question)
    return {
        "context": format_docs(retrieved_docs),
        "question": question
    }

template = """You are an expert assistant trained to extract exact command-line instructions from a user guide.

You are also provided with a node-to-subcommand mapping reference (e.g., "logistic_regression" → "model", "standard_scaler" → "preprocessor"). Use this mapping only to determine the correct value for <node_name> in the CLI command syntax.

Reference Information:
{context}

User Query:
{question}

Instructions:
- Carefully read the Reference Information, including any node-to-subcommand mappings and CLI usage documentation.
- Return the CLI command(s) that are relevant to the user's query with the node name in the node-to subcommand mapping.
- Use the mapping to replace <node_name> with its corresponding type (e.g., model, preprocessor, etc.)
- IMPORTANT: Never modify or replace <args>. Keep <args> exactly as it is shown.
- Format your response as a raw Python list: ['command1', 'command2', ...]
- Do NOT include any explanations, comments, or formatting outside the list.
- If no command matches the query, return an empty list: []
- The output MUST be ordered in the same order as the input.

Example Query:
I want to make a logistic regression model and make a project.

Expected Response:
[
    'make model <args>',
    'create_project <project_id>'
]
"""

prompt = ChatPromptTemplate.from_template(template)

rag_chain = (
    RunnablePassthrough()
    | combine_inputs
    | prompt
    | get_llm(model_name="gemini-2.0-flash")
    | StrOutputParser()
)

### Keyword Extraction (Hybrid)

In [23]:
name_extractor = SentenceTransformer('all-MiniLM-L6-v2')
nlp = spacy.load("en_core_web_sm")
REFERENCE_KEYWORDS = list(raw_mapping.keys())
keyword_embeddings = name_extractor.encode(REFERENCE_KEYWORDS, convert_to_tensor=True)

def extract_keywords_hybrid(user_input: str, reference_keywords, top_n, transformer_thresh=0.7, fuzzy_thresh=80):
    keyword_embeddings = name_extractor.encode(reference_keywords, convert_to_tensor=True)
    doc = nlp(user_input.lower())
    phrases = [chunk.text for chunk in doc.noun_chunks]
    matched_keywords = []

    for phrase in phrases:
        phrase_embedding = name_extractor.encode(phrase, convert_to_tensor=True)
        cosine_scores = util.cos_sim(phrase_embedding, keyword_embeddings)[0]

        for idx, score in enumerate(cosine_scores):
            if score >= transformer_thresh:
                if reference_keywords[idx] not in matched_keywords:
                    matched_keywords.append(reference_keywords[idx])

        # === Fuzzy matching fallback ===
        for keyword in reference_keywords:
            if fuzz.ratio(phrase, keyword.replace("_", " ")) >= fuzzy_thresh:
                if keyword not in matched_keywords:
                    matched_keywords.append(keyword)
    top_n = min(top_n, len(matched_keywords))
    best_match = matched_keywords.copy()
    if len(matched_keywords) > top_n:
        best_match = sorted(matched_keywords, key=lambda x: fuzz.ratio(user_input, x), reverse=True)[:top_n]
    matched_keywords = list(filter(lambda x: x in best_match, matched_keywords))
    return matched_keywords

### Post-Processing

In [24]:
def parse_command_list(output: str):
    pattern = r"\[(.*?)\]"
    matches = re.findall(pattern, output, re.DOTALL)

    if matches:
        output = f"[{matches[0]}]"

    try:
        command_list = ast.literal_eval(output.strip())
        return command_list if isinstance(command_list, list) else [command_list]
    except Exception as e:
        return [f"Failed to parse: {e}"]

def args_extractor(names, mapping, data_mapping):
    result = []
    for name in names:
        if name in mapping and name in data_mapping:
            # node_type = mapping[name]
            args_str = json.dumps(data_mapping[name], separators=(',', ':'))
            result.append(args_str)
    return result

def replace_args(commands, replacements, names, mapper):

    c = 0
    for i, command in enumerate(commands):
        if "<args>" in command:
            if names[c] in list(mapper.keys()):
                commands[i] = command.replace("<args>", replacements[c])
                c += 1
            else:
                continue
        else:
            continue

    return commands

### Run Everything

In [25]:
question = "please make a single imputer"
# RAG output
rag_output = rag_chain.invoke({"question": question})

parsed_output = parse_command_list(rag_output)
# Extract command names
names = extract_keywords_hybrid(question, REFERENCE_KEYWORDS, top_n=len(parsed_output))
# Extract args and types
args_list = args_extractor(names, raw_mapping, data_mapping)
# Parse RAG and replace placeholders
final_output = replace_args(parsed_output, args_list, names, raw_mapping)
final_output

  retrieved_docs = retriever.get_relevant_documents(question)


['make preprocessor {"preprocessor_name":"simple_imputer","preprocessor_type":"imputer","params":{"strategy":"mean"}}']