In [1]:
import os
import socket
from typing import List, Dict
import subprocess
import time
import re
import threading
import queue

# LangChain and related libraries
from langchain_community.document_loaders import WebBaseLoader
from langchain_community.vectorstores import Chroma
from langchain_core.prompts import PromptTemplate
from langchain_huggingface import HuggingFaceEmbeddings
from langchain_text_splitters import RecursiveCharacterTextSplitter
from pydantic import BaseModel, Field
from langchain_ollama.chat_models import ChatOllama

# Search library
try:
    from googlesearch import search
except ImportError:
    print("Error: 'googlesearch-python' is not installed. Please run 'pip install googlesearch-python'")

os.environ["USER_AGENT"] = "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/109.0"

USER_AGENT environment variable not set, consider setting it to identify your requests.


In [2]:
class SolBenchmarker:
    """
    An asynchronous agent that submits and monitors CPU and GPU benchmark
    jobs in parallel on the ASU Sol supercomputer using multithreading.
    """
    def __init__(self, user: str, python_env: str = "rapids25.02"):
        if not user or user == "YOUR_ASURITE_ID":
            raise ValueError("A valid ASURITE username is required for SolBenchmarker.")
        self.user = user
        self.python_env = python_env

    def _generate_cpu_sbatch(self, script_dir: str, script_name: str) -> str:
        # This helper method remains unchanged.
        return f"""#!/bin/bash
#SBATCH -p general
#SBATCH -q public
#SBATCH -t 0-00:10:00
#SBATCH -c 1
#SBATCH -o {script_dir}/slurm_cpu-%j.out
#SBATCH -e {script_dir}/slurm_cpu-%j.err

module load mamba/latest
source activate {self.python_env}
/usr/bin/time -p python3 {script_dir}/{script_name} 2>&1
"""

    def _generate_gpu_sbatch(self, script_dir: str, script_name: str) -> str:
        # This helper method remains unchanged.
        return f"""#!/bin/bash
#SBATCH -p general
#SBATCH -q public
#SBATCH -G 1
#SBATCH -A grp_hackathon2025
#SBATCH --reservation=hackathon2025
#SBATCH -t 0-00:10:00
#SBATCH -c 1
#SBATCH -o {script_dir}/slurm_gpu-%j.out
#SBATCH -e {script_dir}/slurm_gpu-%j.err

module load mamba/latest
source activate {self.python_env}
/usr/bin/time -p python3 {script_dir}/{script_name} 2>&1
"""

    def _submit_job(self, sbatch_script: str) -> str:
        # This helper method remains unchanged.
        sbatch_path = os.path.join(os.getcwd(), "benchmark_files", f"sbatch_job_{int(time.time())}_{os.getpid()}.sh")
        with open(sbatch_path, "w") as f:
            f.write(sbatch_script)

        process = subprocess.run(f"sbatch {sbatch_path}", shell=True, capture_output=True, text=True)
        os.remove(sbatch_path)

        if process.returncode != 0:
            raise RuntimeError(f"sbatch submission failed: {process.stderr}")

        job_id_match = re.search(r"Submitted batch job (\d+)", process.stdout.strip())
        if not job_id_match:
            raise RuntimeError(f"Could not parse Job ID from sbatch output: {process.stdout}")

        return job_id_match.group(1)

    def _monitor_job(self, job_id: str, job_type: str, benchmark_dir: str, output_queue: queue.Queue):
        """
        Monitors a single SLURM job in a separate thread.
        Polls squeue and, upon completion, parses the output and puts the result on a queue.
        This function runs in a separate thread for each job (CPU/GPU).
        """
        print(f"--> [Thread-{job_type}] Started monitoring Job {job_id}.")
        try:
            while True:
                # Check if the job is still in the queue
                squeue_cmd = f"squeue -u {self.user} -j {job_id}"
                process = subprocess.run(squeue_cmd, shell=True, capture_output=True, text=True)

                if job_id not in process.stdout:
                    # Job is no longer in the queue, it has finished.
                    output_queue.put(
                        ("status", f"--> [Agent 4] Job `{job_id}` ({job_type}) completed. Parsing results...")
                    )

                    # Wait a brief moment for the filesystem to catch up and write the log file.
                    time.sleep(2)

                    log_prefix = "slurm_cpu" if job_type == "CPU" else "slurm_gpu"
                    output_file_path = os.path.join(benchmark_dir, f"{log_prefix}-{job_id}.out")

                    if os.path.exists(output_file_path):
                        with open(output_file_path, "r") as f:
                            content = f.read()

                        time_match = re.search(r"real\s+([\d.]+)", content)
                        exec_time = float(time_match.group(1)) if time_match else None
                        result = {"type": job_type, "time": exec_time, "log": content}
                    else:
                        result = {"type": job_type, "time": None, "log": f"Error: Log file not found at {output_file_path}"}

                    output_queue.put(("result", result))
                    print(f"--> [Thread-{job_type}] Finished monitoring Job {job_id}.")
                    break  # Exit the monitoring loop for this thread

                # Wait before polling again
                time.sleep(5)
        except Exception as e:
            error_result = {"type": job_type, "time": None, "log": f"An error occurred in the monitoring thread: {e}"}
            output_queue.put(("result", error_result))
            print(f"--> [Thread-{job_type}] Error monitoring Job {job_id}: {e}")


    def run_benchmark_async(self, cpu_code: str, gpu_code: str):
        """
        Submits CPU and GPU jobs and yields their results as they complete.
        This is the main generator function. It uses separate threads to monitor
        each job independently, ensuring results are streamed to the UI immediately.
        """
        benchmark_dir = os.path.join(os.getcwd(), "benchmark_files")
        os.makedirs(benchmark_dir, exist_ok=True)

        # Write the CPU and GPU scripts
        cpu_script_path = os.path.join(benchmark_dir, "cpu_benchmark.py")
        gpu_script_path = os.path.join(benchmark_dir, "gpu_benchmark.py")
        with open(cpu_script_path, "w") as f: f.write(cpu_code)
        with open(gpu_script_path, "w") as f: f.write(gpu_code)

        try:
            # Submit CPU and GPU jobs
            cpu_job_id = self._submit_job(self._generate_cpu_sbatch(benchmark_dir, "cpu_benchmark.py"))
            gpu_job_id = self._submit_job(self._generate_gpu_sbatch(benchmark_dir, "gpu_benchmark.py"))

            yield f"--> [Agent 4] Submitted CPU Job `{cpu_job_id}` and GPU Job `{gpu_job_id}`. Monitoring in parallel..."

            # A thread-safe queue to hold results from monitor threads
            results_queue = queue.Queue()

            # Create and start monitoring threads
            cpu_monitor_thread = threading.Thread(
                target=self._monitor_job,
                args=(cpu_job_id, "CPU", benchmark_dir, results_queue),
                daemon=True
            )
            gpu_monitor_thread = threading.Thread(
                target=self._monitor_job,
                args=(gpu_job_id, "GPU", benchmark_dir, results_queue),
                daemon=True
            )

            cpu_monitor_thread.start()
            gpu_monitor_thread.start()

            # Expect 4 items total: 2 status updates and 2 final results
            completed_items = 0
            while completed_items < 4:
                try:
                    # Block and wait for any result to be put on the queue
                    event_type, data = results_queue.get(timeout=300)  # 5-minute timeout
                    yield data  # Yield the status string OR the result dictionary
                    completed_items += 1
                except queue.Empty:
                    yield "--> [Agent 4] Benchmark timed out waiting for a result."
                    break # Exit if no result in 5 mins

        except Exception as e:
            yield f"Error in benchmark agent setup: {e}"

In [3]:
class SearchQueryGenerator(BaseModel):
    queries: List[str] = Field(description="A list of targeted, keyword-focused search queries.")

def generate_search_queries(query: str, llm) -> List[str]:
    # This function remains unchanged.
    print("-> Using LLM with FULLY ABSTRACTED prompt to generate search queries...")
    prompt_template = PromptTemplate(
        template="""
        You are an expert at generating web search queries for a technical audience.
        Analyze the user's question to identify the core technical task and the programming language.
        Based on your knowledge, generate 5 concise, targeted search queries. Two queries should be for the standard, CPU-based library for that task/language. Three queries should be for potential GPU-accelerated libraries for that task/language, prioritizing NVIDIA-based solutions if they exist.

        User Question: "{question}"

        Generate a JSON list of 5 search query strings.
        """,
        input_variables=["question"],
    )
    query_generation_chain = prompt_template | llm.with_structured_output(SearchQueryGenerator)
    try:
        response_model = query_generation_chain.invoke({"question": query})
        print(f"-> Generated queries: {response_model.queries}")
        return response_model.queries
    except Exception as e:
        print(f"-> LLM failed to generate structured output: {e}")
        return []

def dynamic_search_agentic(queries: List[str]) -> list[str]:
    # This function remains unchanged.
    print("-> Executing dynamic search...")
    all_urls = set()
    for q in queries:
        try:
            enhanced_query = f"{q} site:developer.nvidia.com OR site:medium.com/rapids-ai OR site:medium.com/cupy-team"
            search_results = list(search(enhanced_query, num_results=2))
            for url in search_results:
                all_urls.add(url)
        except Exception as e:
            print(f"An error occurred during search for query '{q}': {e}")
            continue
    final_urls = [url for url in list(all_urls) if url]
    print(f"-> Found {len(final_urls)} unique URLs: {final_urls}")
    return final_urls

def _extract_python_code(markdown_text: str) -> Dict[str, str]:
    # This helper function remains unchanged.
    code_pattern = r"```python\n(.*?)\n```"
    gpu_heading_pattern = r"### Recommended GPU Solution.*?\n"
    cpu_heading_pattern = r"### Standard CPU Solution.*?\n"
    gpu_section_match = re.search(gpu_heading_pattern, markdown_text, re.DOTALL | re.IGNORECASE)
    cpu_section_match = re.search(cpu_heading_pattern, markdown_text, re.DOTALL | re.IGNORECASE)
    gpu_code = ""
    cpu_code = ""
    if gpu_section_match:
        section_start = gpu_section_match.end()
        code_match = re.search(code_pattern, markdown_text[section_start:], re.DOTALL)
        if code_match: gpu_code = code_match.group(1).strip()
    if cpu_section_match:
        section_start = cpu_section_match.end()
        code_match = re.search(code_pattern, markdown_text[section_start:], re.DOTALL)
        if code_match: cpu_code = code_match.group(1).strip()
    return {"cpu_code": cpu_code, "gpu_code": gpu_code}

def process_with_rag(query: str):
    # This is the main RAG pipeline, which is a generator function
    print("--- Running FINAL DYNAMIC RAG Pipeline ---")
    host_node = socket.gethostname()
    # Ensure you are using the correct user and port for your Ollama instance
    llm = ChatOllama(model="qwen3:14b", base_url=f"http://mrajanva@{host_node}:11434/")

    search_queries = generate_search_queries(query, llm)
    urls = dynamic_search_agentic(search_queries) if search_queries else []

    context_text = ""
    if urls:
        print("-> Found documents. Loading and processing context...")
        docs = [WebBaseLoader(url).load() for url in urls]
        docs_list = [item for sublist in docs for item in sublist]
        text_splitter = RecursiveCharacterTextSplitter.from_tiktoken_encoder(chunk_size=500, chunk_overlap=100)
        doc_splits = text_splitter.split_documents(docs_list)
        embedding_model = HuggingFaceEmbeddings(model_name="all-MiniLM-L6-v2")
        vectorstore = Chroma.from_documents(documents=doc_splits, embedding=embedding_model, collection_name="rag-chroma")
        retriever = vectorstore.as_retriever()
        retrieved_docs = retriever.invoke(query)
        context_text = "\n\n---\n\n".join([doc.page_content for doc in retrieved_docs])
        vectorstore.delete_collection()

    final_prompt_template = PromptTemplate(
        template="""
        You are a friendly and knowledgeable AI Tutor for a project focused on data science acceleration. Your primary mission is to educate users on leveraging NVIDIA-based GPU libraries.
        First, analyze the user's QUESTION to identify the core task and programming language. Based on your knowledge, determine if a common NVIDIA-based GPU-accelerated library (like cuPy, cuDF, Rapids) exists for that specific task and language.
        Then, follow the appropriate path below to structure your conversational and helpful answer:

        **PATH 1: An NVIDIA-based GPU-accelerated library EXISTS for this task.**
        1.  Identify the standard CPU library and the NVIDIA GPU library for the user's language and task.
        2.  Start with a friendly opening that explains you will show both the GPU-accelerated and standard methods.
        3.  Provide a heading for the GPU solution, dynamically inserting the library name (e.g., `### Recommended GPU Solution (with [GPU Library Name])`).
        4.  Write the code example for the GPU solution.
        5.  Add a "Performance Note" section. Explain the benefits of the GPU approach.
        6.  Provide a heading for the CPU solution, dynamically inserting the library name (e.g., `### Standard CPU Solution (with [CPU Library Name])`).
        7.  Write the CPU-based code for comparison.
        8.  Do NOT add any disclaimer note at the end.

        **PATH 2: An NVIDIA-based GPU-accelerated library DOES NOT EXIST for this task.**
        1.  Identify the standard library for the user's specified language and task.
        2.  Start with a friendly opening explaining the standard approach.
        3.  Provide a heading for the standard solution, dynamically inserting the library name (e.g., `### Standard Solution (with [Library Name])`).
        4.  Write the code example using the identified standard library.
        5.  End your entire response with the exact sentence: "Note: The provided solution is the standard method for this task, as a direct NVIDIA-based GPU library for it is not common."

        Use the CONTEXT below to inform your answer if it is relevant, but your primary instruction is to follow the mission and logic paths described above.

        CONTEXT:
        {context}

        QUESTION:
        {question}

        YOUR FINAL ANSWER:
        """,
        input_variables=["context", "question"],
    )

    final_chain = final_prompt_template | llm
    llm_response_text = final_chain.invoke({"context": context_text, "question": query}).content

    # 1. Immediately yield the main answer from the LLM
    print("SENDING GENERATED RESPONSE")
    yield llm_response_text

    extracted_code = _extract_python_code(llm_response_text)
    cpu_code = extracted_code.get("cpu_code")
    gpu_code = extracted_code.get("gpu_code")
    asurite_id = "mrajanva" # IMPORTANT: Make sure this is your ASURITE ID

    if cpu_code and gpu_code:
        print("--> [Agent 3] Both CPU and GPU code found. Invoking Agent 4 for benchmarking.")
        try:
            benchmarker = SolBenchmarker(user=asurite_id)
            benchmark_generator = benchmarker.run_benchmark_async(cpu_code, gpu_code)

            # --- MODIFIED SECTION: Handle the real-time stream from the benchmarker ---
            # Loop through the benchmark generator and yield each result or status update immediately.
            for item in benchmark_generator:
                if isinstance(item, str):
                    # This is a status update string. Yield it directly to the UI.
                    yield f"\n\n_{item}_"
                elif isinstance(item, dict):
                    # This is a completed job result. Format it as Markdown and yield it.
                    job_type = item.get("type")
                    exec_time = item.get("time")
                    time_str = f"{exec_time:.4f} seconds" if exec_time is not None else "N/A (script failed)"
                    benchmark_md_chunk = f"\n\n---\n### ✅ {job_type} Benchmark Complete\n"
                    benchmark_md_chunk += f"**Execution Time:** {time_str}\n"
                    yield benchmark_md_chunk
            # --- END OF MODIFIED SECTION ---

        except ValueError as e:
            yield f"\n\n---\n### ⚠️ Benchmark Skipped\nConfiguration error: {e}"
    else:
        print("--> [Agent 3] Did not find both CPU and GPU code. Skipping benchmark.")

    print("--- Pipeline Complete ---")

# Gradio

In [4]:
# =================================================================

# GRADIO CHAT APPLICATION FOR AI TUTOR (FINAL)

#

# This version fixes the UserWarning by specifying the modern 'messages'

# format for the chatbot component.

# =================================================================



import gradio as gr

import re

import time



# This wrapper function connects our backend logic to the Gradio UI.

# It assumes 'process_with_rag' is defined and available in the notebook environment.

def tutor_chat_interface(user_message, history):

    # This part remains mostly the same

    history.append({"role": "user", "content": user_message})

    history.append({"role": "assistant", "content": ""}) # Add an empty placeholder for the bot

    yield history, "Thinking..."



    # The RAG pipeline is now a generator

    response_generator = process_with_rag(user_message)



    # --- MODIFIED: Loop through the stream ---

    full_response_text = ""

    for chunk in response_generator:

        # Append each new piece of text to the full response

        full_response_text += chunk

        # Update the last message in the history with the new combined text

        history[-1]["content"] = full_response_text

        yield history, "Agent is working..." # Update the chatbot with the streaming text



# Build the Gradio UI using Blocks for more control

with gr.Blocks(theme=gr.themes.Soft(), css=".gradio-container {background-color: #f5f5f5;}") as demo:

    gr.Markdown("# 🤖 AI Accelerated Data Science Tutor")

    gr.Markdown("Ask a question about a data science task. The tutor will provide an explanation and code, prioritizing NVIDIA GPU-accelerated solutions where possible.")



    # THE FIX IS HERE: Added 'type="messages"' to the chatbot component.

    chatbot = gr.Chatbot(label="Conversation", height=450, bubble_full_width=False, type="messages")

    

    with gr.Accordion("🔎 Show Agent's Thought Process", open=False):

        cot_output = gr.Markdown("The agent's reasoning will appear here after it responds.")



    with gr.Row():

        msg_textbox = gr.Textbox(

            label="Your Question",

            placeholder="e.g., How do I multiply two 10x10 arrays in Python?",

            scale=4,

            autofocus=True,

            container=False # This makes the textbox look cleaner

        )

        submit_btn = gr.Button("Ask", variant="primary", scale=1, min_width=150)



    # Main function to handle the chat logic

    def handle_submit(user_message, chat_history):

        response_generator = tutor_chat_interface(user_message, chat_history)

        for history_state, thought_process in response_generator:

            yield history_state, thought_process



    # Connect the submit button and textbox to the handler function

    submit_btn.click(

        handle_submit, 

        [msg_textbox, chatbot], 

        [chatbot, cot_output]

    ).then(lambda: gr.update(value=""), None, [msg_textbox], queue=False)



    msg_textbox.submit(

        handle_submit, 

        [msg_textbox, chatbot], 

        [chatbot, cot_output]

    ).then(lambda: gr.update(value=""), None, [msg_textbox], queue=False)





# Launch the application

# Set share=True if you need a public link from the Sol jupyter notebook
demo.launch(share=True, debug=True)

SyntaxError: invalid non-printable character U+00A0 (1593259314.py, line 29)