In [None]:
!pip install groq langchain_community langgraph faiss-cpu pypdf

In [2]:
import os
import ast
import re
import getpass
import shutil
import tempfile
import subprocess
from typing import List, TypedDict, Literal
from groq import Groq
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import FAISS
from langchain_community.embeddings import HuggingFaceEmbeddings
from langchain_community.document_loaders import DirectoryLoader, PyPDFLoader
from langchain_community.tools.tavily_search import TavilySearchResults
from langgraph.graph import StateGraph, END
import subprocess
import sys

arduino_cli_path = shutil.which('arduino-cli')
if arduino_cli_path:
    print(f"Found 'arduino-cli' at: {arduino_cli_path}. Will use it for validation.")
else:
    print("WARNING: 'arduino-cli' not found in PATH. Falling back to basic C++ syntax checks for Arduino code.")


def get_groq_api_key():
    """Securely gets the Groq API key from environment variables or user input."""
    if "GROQ_API_KEY" in os.environ:
        print("Found GROQ_API_KEY in environment variables.")
        return os.environ["GROQ_API_KEY"]
    else:
        print("GROQ_API_KEY not found in environment variables.")
        return getpass.getpass("Please enter your Groq API key: ")

def get_tavily_api_key():
    """Securely gets the Tavily API key for web search."""
    if "TAVILY_API_KEY" in os.environ:
        print("Found TAVILY_API_KEY in environment variables.")
        return os.environ["TAVILY_API_KEY"]
    else:
        print("TAVILY_API_KEY not found.")
        return getpass.getpass("Please enter your Tavily API key for web search: ")

groq_api_key = get_groq_api_key()
tavily_api_key = get_tavily_api_key()

if not groq_api_key or not tavily_api_key:
    print("One or more API keys are missing. Exiting.")
    exit()

client = Groq(api_key=groq_api_key)
os.environ["TAVILY_API_KEY"] = tavily_api_key
embedding_model = HuggingFaceEmbeddings(model_name="BAAI/bge-large-en-v1.5")

GROQ_API_KEY not found in environment variables.
Please enter your Groq API key: ··········
TAVILY_API_KEY not found.
Please enter your Tavily API key for web search: ··········


  embedding_model = HuggingFaceEmbeddings(model_name="BAAI/bge-large-en-v1.5")


modules.json:   0%|          | 0.00/349 [00:00<?, ?B/s]

config_sentence_transformers.json:   0%|          | 0.00/124 [00:00<?, ?B/s]

README.md: 0.00B [00:00, ?B/s]

sentence_bert_config.json:   0%|          | 0.00/52.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/779 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/1.34G [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/366 [00:00<?, ?B/s]

vocab.txt: 0.00B [00:00, ?B/s]

tokenizer.json: 0.00B [00:00, ?B/s]

special_tokens_map.json:   0%|          | 0.00/125 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/191 [00:00<?, ?B/s]

In [3]:
# --- Document Loading from PDF ---
def load_documents_from_directory(path: str) -> List:
    """Loads PDF documents from the specified directory."""
    print(f"Loading PDF documents from '{path}'...")
    if not os.path.isdir(path):
        print(f"Error: Directory '{path}' not found.")
        print("Please create the 'knowledge_base' directory and add your PDF book(s).")
        exit()

    loader = DirectoryLoader(path, glob="**/*.pdf", loader_cls=PyPDFLoader, show_progress=True, use_multithreading=True)
    documents = loader.load()
    if not documents:
        print(f"No PDF documents found in '{path}'. The agent will rely solely on web search.")
    else:
        print(f"Successfully loaded {len(documents)} document(s).")
    return documents

class AgentState(TypedDict):
    question: str
    documents: List[str]
    generation: str
    is_code: bool
    code_type: Literal["python", "cpp", "none"]
    validation_error: str
    retries: int
    final_answer: str
    citations: List[str]

# --- Graph Nodes ---

def retrieve_documents(state: AgentState):
    """Node to retrieve relevant documents from the vector store."""
    print("\n--- 📄 Retrieving Documents from PDF Content ---")
    question = state["question"]
    if retriever:
        docs = retriever.invoke(question)
        retrieved_chunks = []
        citations = []
        for i, d in enumerate(docs, 1):
            chunk_text = d.page_content.strip()
            source = d.metadata.get("source", "Unknown")
            page = d.metadata.get("page", "N/A")

            retrieved_chunks.append(f"[{i}] {chunk_text}")
            citations.append(f"[{i}] Source: {source}, Page: {page}")
        print(f"Retrieved {len(retrieved_chunks)} relevant chunks for the query.")
        return {"documents": retrieved_chunks, "citations": citations}
    return {"documents": [], "citations": []}

def web_search(state: AgentState):
    """Perform web search and track results with citations."""
    print("--- 🌐 Performing Web Search ---")
    question = state["question"]
    documents = state["documents"]
    citations = state.get("citations", [])
    search_tool = TavilySearchResults(max_results=3)
    try:
        search_results = search_tool.invoke(question)
        for i, res in enumerate(search_results, 1):
            content = res["content"].strip()
            url = res["url"]
            documents.append(f"[WEB{i}] {content}")
            citations.append(f"[WEB{i}] {url}")
        print(f"Web search found {len(search_results)} results.")
    except Exception as e:
        print(f"Web search failed: {e}. Proceeding without web context.")
    return {"documents": documents, "citations": citations}


def generate_code_or_doc(state: AgentState):
    """Node to generate runnable code or explanation with documentation using the Llama 3.1 LLM."""
    print("--- 🤖 Generating Response with Llama 3.1 ---")
    question, documents, validation_error, retries, citations = (
        state['question'],
        state['documents'],
        state.get('validation_error'),
        state.get('retries', 0),
        state.get('citations', []),
    )

    system_prompt = (
        "You are an expert programmer specializing in Arduino (C++) and Raspberry Pi (Python). "
        "Your task is to provide complete, runnable code or clear technical explanations "
        "based on the user's question and the provided context. "
        "When using information from the documents, cite them with their index like [1], [2]. "
        "You must also generate detailed documentation for the code so that a beginner can understand it."
    )

    # Build user prompt
    user_prompt_parts = [
        f"Combined Context (from PDF and Web):\n{''.join(documents)}",
        f"Citations:\n" + "\n".join(citations) if citations else ""
    ]
    if validation_error:
        print(f"--- ⚠️ Retrying (Attempt {retries}) with validation error ---")
        user_prompt_parts.append(
            f"The previous code attempt failed with the following validation error: '{validation_error}'. "
            "Please analyze the error and provide a corrected script with explanations."
        )

    user_prompt_parts.append(f"User's Question: {question}")
    user_prompt_parts.append(
        "Please provide:\n"
        "1. The complete runnable code.\n"
        "2. A separate section titled '📘 Documentation' explaining the code step by step "
        "in markdown with headings and bullet points."
    )
    user_prompt = "\n\n".join(user_prompt_parts)

    completion = client.chat.completions.create(
        model="llama-3.1-8b-instant",
        messages=[
            {"role": "system", "content": system_prompt},
            {"role": "user", "content": user_prompt},
        ],
        temperature=0.6,
        max_tokens=8000,
    )
    generation = completion.choices[0].message.content

    # Detect language
    is_python = any(k in generation for k in ["def ", "import RPi.GPIO", "import smbus"])
    is_cpp = any(k in generation for k in ["void setup()", "void loop()", "#include <Arduino.h>"])
    code_type = "python" if is_python else "cpp" if is_cpp else "none"

    print(f"Generation complete. Identified as code: {is_python or is_cpp}. Type: {code_type}.")
    return {
        "generation": generation,
        "is_code": is_python or is_cpp,
        "code_type": code_type,
        "retries": retries + 1,
        "citations": citations
    }

def clean_code(code_string: str, language: str) -> str:
    pattern = rf"```{language}\s*\n(.*?)\n```"
    match = re.search(pattern, code_string, re.DOTALL)
    if match:
        return match.group(1).strip()
    return code_string.replace("```", "").strip()

In [4]:
def validate_python_code(state: AgentState):
    """More specific validation for Python code using AST."""
    print("--- 🐍 Validating Python Code ---")
    code = clean_code(state["generation"], "python")
    if not code:
        return {"validation_error": "No Python code found to validate."}
    try:
        ast.parse(code)
        print("Validation successful: Python code is syntactically correct.")
        return {"validation_error": None}
    except (SyntaxError, IndentationError) as e:
        error_msg = f"Python Syntax Error: {e}"
        print(f"Validation failed: {error_msg}")
        return {"validation_error": error_msg}

def validate_cpp_code(state: AgentState):
    """
    Validates C++/Arduino code by attempting to compile it with 'arduino-cli'.
    Falls back to basic syntax checks if the CLI is not available.
    """
    if not arduino_cli_path:
        return validate_cpp_fallback(state)
    print("--- 🔬 Validating C++/Arduino Code with arduino-cli ---")
    code = clean_code(state["generation"], "cpp")
    if not code:
        return {"validation_error": "No C++ code found to validate."}

    with tempfile.TemporaryDirectory() as temp_dir:
        sketch_name = "temp_sketch"
        sketch_path = os.path.join(temp_dir, sketch_name)
        os.makedirs(sketch_path)
        sketch_file_path = os.path.join(sketch_path, f"{sketch_name}.ino")
        with open(sketch_file_path, "w") as f:
            f.write(code)
        command = [arduino_cli_path, "compile", "--fqbn", "arduino:avr:uno", sketch_path]
        result = subprocess.run(command, capture_output=True, text=True, check=False)

        if result.returncode == 0:
            print("Validation successful: Arduino sketch compiled successfully.")
            return {"validation_error": None}
        else:
            error_message = result.stderr.strip()
            print(f"Validation failed: Compilation error.\n{error_message}")
            return {"validation_error": error_message}

def validate_cpp_fallback(state: AgentState):
    """Fallback validation for C++/Arduino code if arduino-cli is not present."""
    print("--- 🔬 Validating C++/Arduino Code ---")
    code = clean_code(state["generation"], "cpp")
    if not code:
        return {"validation_error": "No C++ code found to validate."}
    errors = []
    if "void setup()" not in code: errors.append("Missing 'void setup()' function.")
    if "void loop()" not in code: errors.append("Missing 'void loop()' function.")
    if code.count('{') != code.count('}'): errors.append("Mismatched curly braces {}.")
    if code.count('(') != code.count(')'): errors.append("Mismatched parentheses ().")
    if errors:
        error_msg = " | ".join(errors)
        print(f"Validation failed: {error_msg}")
        return {"validation_error": error_msg}
    else:
        print("Validation successful: C++/Arduino code seems structurally plausible.")
        return {"validation_error": None}

def prepare_final_output(state: AgentState):
    print("--- ✅ Preparing Final Output ---")
    return {
        "final_answer": state["generation"],
        "citations": state.get("citations", [])
    }

# --- Conditional Edges for Routing ---
def route_to_validation(state: AgentState):
    return state["code_type"] if state["is_code"] else "end"

def route_after_validation(state: AgentState):
    if state.get("validation_error") and state.get("retries", 0) < MAX_RETRIES:
        return "retry"
    return "end"

In [7]:
MAX_RETRIES = 2
KNOWLEDGE_BASE_PATH = "knowledge_base"

print("\n--- Setting up RAG System for PDF Knowledge Base ---")
raw_documents = load_documents_from_directory(KNOWLEDGE_BASE_PATH)
retriever = None
if raw_documents:
    text_splitter = RecursiveCharacterTextSplitter(chunk_size=1500, chunk_overlap=200)
    docs = text_splitter.split_documents(raw_documents)
    print(f"PDF content split into {len(docs)} chunks.")
    print("Initializing embedding model (this may take a moment)...")
    embedding_model = HuggingFaceEmbeddings(model_name="BAAI/bge-large-en-v1.5")
    vectorstore = FAISS.from_documents(documents=docs, embedding=embedding_model)
    retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
    print("RAG system is ready.")


--- Setting up RAG System for PDF Knowledge Base ---
Loading PDF documents from 'knowledge_base'...


100%|██████████| 2/2 [00:10<00:00,  5.38s/it]


Successfully loaded 494 document(s).
PDF content split into 567 chunks.
Initializing embedding model (this may take a moment)...
RAG system is ready.


In [8]:
# --- Build Workflow ---
workflow = StateGraph(AgentState)
workflow.add_node("retrieve", retrieve_documents)
workflow.add_node("web_search", web_search)
workflow.add_node("generate", generate_code_or_doc)
workflow.add_node("validate_python", validate_python_code)
workflow.add_node("validate_cpp", validate_cpp_code)
workflow.add_node("prepare_output", prepare_final_output)

workflow.set_entry_point("retrieve")
workflow.add_edge("retrieve", "web_search")
workflow.add_edge("web_search", "generate")
workflow.add_conditional_edges("generate", route_to_validation, {
    "python": "validate_python", "cpp": "validate_cpp", "none": "prepare_output", "end": "prepare_output"
})
workflow.add_conditional_edges("validate_python", route_after_validation, {"retry": "generate", "end": "prepare_output"})
workflow.add_conditional_edges("validate_cpp", route_after_validation, {"retry": "generate", "end": "prepare_output"})
workflow.add_edge("prepare_output", END)
app = workflow.compile()

In [6]:
try:
    image_bytes = app.get_graph().draw_mermaid_png()
    with open("workflow_graph.png", "wb") as f:
        f.write(image_bytes)
    print("\n--- 📊 Workflow graph image saved to workflow_graph.png ---")
except Exception as e:
    print(f"\n--- ⚠️ Could not generate graph image: {e} ---")
    print("Please ensure graphviz is installed and in your PATH ([https://graphviz.org/download/](https://graphviz.org/download/)).")


--- 📊 Workflow graph image saved to workflow_graph.png ---


In [9]:
# --- Run Queries ---
queries = [
    "Based on the book, what is the C++ code for reading an analog sensor with an Arduino?",
    "Find the most popular Python library for controlling an I2C LCD screen with a Raspberry Pi and write a script to display 'Hello World'."
]
for i, q in enumerate(queries):
    print(f"\n{'='*50}\n🚀 Executing Query {i+1}: '{q}'\n{'='*50}")
    inputs = {"question": q}
    result = app.invoke(inputs)

    print("\n\n--- ✨ Final Answer ✨ ---")
    print(result.get('final_answer', 'No final answer was generated.'))
    print("\n--- 📚 Citations ---")
    for c in result.get("citations", []):
        print(c)
    print("--------------------------\n")


🚀 Executing Query 1: 'Based on the book, what is the C++ code for reading an analog sensor with an Arduino?'

--- 📄 Retrieving Documents from PDF Content ---
Retrieved 2 relevant chunks for the query.
--- 🌐 Performing Web Search ---


  search_tool = TavilySearchResults(max_results=3)


Web search found 3 results.
--- 🤖 Generating Response with Llama 3.1 ---
Generation complete. Identified as code: True. Type: cpp.
--- 🔬 Validating C++/Arduino Code ---
Validation successful: C++/Arduino code seems structurally plausible.
--- ✅ Preparing Final Output ---


--- ✨ Final Answer ✨ ---
### Complete Runnable Code

```cpp
// the setup routine runs once when you press reset:
void setup() {
  // initialize serial communication at 9600 bits per second:
  Serial.begin(9600);
  // initialize all readings to 0
  int i;
  for (i = 0; i < 10; i++) {
    readings[i] = 0;
  }
  index = 0;
  total = 0;
}

// the loop routine runs over and over again forever:
void loop() {
  // read the input on analog pin A0:
  int sensorValue = analogRead(A0);
  // print out the value you read:
  Serial.println(sensorValue);
  delay(1);        // delay in between reads for stability

  // Read from the sensor
  readings[index] = sensorValue;
  total += readings[index];
  index = (index + 1);
  if (inde