In [31]:
from langchain.agents import initialize_agent, AgentType, Tool
from langchain_community.llms import LlamaCpp
import gradio as gr
import wikipedia
import requests
from bs4 import BeautifulSoup
from functools import lru_cache
import time
import json
from datetime import datetime

In [None]:
local_model_path = "C:\\LLM\\models\\nous\\Nous-Hermes-2-Mistral-7B-DPO.Q4_K_M.gguf"

# ✅ Initialize local LLM using llama-cpp
llm = LlamaCpp(
    model_path=local_model_path,
    n_ctx=4096,           # Max safe context for 16GB RAM (up from 2048)
    n_batch=128,          # Conservative batch size to prevent OOM
    n_threads=6,          # Match your CPU core count
    n_gpu_layers=0,       # Disable GPU offloading
    use_mmap=True,        # Memory-mapping for efficient loading
    use_mlock=False,      # Disable memory locking (helps with low RAM)
    temperature=0.7,      # Balanced creativity
    top_k=20,             # Faster sampling (default=40)
    top_p=0.9,            # Slightly more focused than 0.95
    repeat_penalty=1.2,   # Reduce repetitive outputs
    # max_tokens=256        # Limit response length
)

llama_model_loader: loaded meta data with 22 key-value pairs and 291 tensors from C:\LLM\models\nous\Nous-Hermes-2-Mistral-7B-DPO.Q4_K_M.gguf (version GGUF V3 (latest))
llama_model_loader: Dumping metadata keys/values. Note: KV overrides do not apply in this output.
llama_model_loader: - kv   0:                       general.architecture str              = llama


llama_model_loader: - kv   1:                               general.name str              = models
llama_model_loader: - kv   2:                       llama.context_length u32              = 32768
llama_model_loader: - kv   3:                     llama.embedding_length u32              = 4096
llama_model_loader: - kv   4:                          llama.block_count u32              = 32
llama_model_loader: - kv   5:                  llama.feed_forward_length u32              = 14336
llama_model_loader: - kv   6:                 llama.rope.dimension_count u32              = 128
llama_model_loader: - kv   7:                 llama.attention.head_count u32              = 32
llama_model_loader: - kv   8:              llama.attention.head_count_kv u32              = 8
llama_model_loader: - kv   9:     llama.attention.layer_norm_rms_epsilon f32              = 0.000010
llama_model_loader: - kv  10:                       llama.rope.freq_base f32              = 10000.000000
llama_model_loader: - 

In [35]:
print(llm) 

[1mLlamaCpp[0m
Params: {'model_path': 'C:\\LLM\\models\\nous\\Nous-Hermes-2-Mistral-7B-DPO.Q4_K_M.gguf', 'suffix': None, 'max_tokens': 256, 'temperature': 0.7, 'top_p': 0.95, 'logprobs': None, 'echo': False, 'stop_sequences': [], 'repeat_penalty': 1.1, 'top_k': 40}


In [22]:
# 🔁 History Logging
HISTORY_LOG = "search_history.json"

def log_query(query: str, tag: str):
    entry = {
        "query": query,
        "tag": tag,
        "timestamp": datetime.utcnow().isoformat()
    }
    try:
        with open(HISTORY_LOG, "a") as f:
            f.write(json.dumps(entry) + "\n")
    except Exception as e:
        print("Logging failed:", e)


In [None]:

# 🌐 Tool Functions
def search_wikipedia(query: str) -> str:
    log_query(query, "general")
    try:
        results = wikipedia.search(query)
        if results:
            page = wikipedia.page(results[0])
            return page.content
        return "No results found."
    except Exception as e:
        return str(e)

def calculator(query: str) -> str:
    log_query(query, "general")
    try:
        response = requests.get(f"http://api.mathjs.org/v4/?expr={query}")
        return response.text
    except Exception as e:
        return str(e)

def get_weather(city: str) -> str:
    log_query(city, "general")
    try:
        api_key = ""  # Add your API key
        response = requests.get(f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}")
        data = response.json()
        if response.status_code == 200:
            return f"The current weather in {city} is: {data['weather'][0]['description']}"
        return "City not found."
    except Exception as e:
        return str(e)

def define_word(word: str) -> str:
    log_query(word, "general")
    try:
        response = requests.get(f"https://api.dictionaryapi.dev/api/v2/entries/en/{word}")
        data = response.json()
        if isinstance(data, list) and len(data) > 0:
            meanings = data[0]["meanings"]
            definitions = [meaning["definitions"][0]["definition"] for meaning in meanings]
            return f"Definitions for {word}: " + ", ".join(definitions)
        return "Word not found."
    except Exception as e:
        return str(e)

def latest_news(_: str) -> str:
    log_query("news", "general")
    try:
        response = requests.get("https://newsapi.org/v2/top-headlines?country=us&apiKey=apiKey")
        data = response.json()
        if response.status_code == 200:
            headlines = [article["title"] for article in data["articles"]]
            return "Latest news headlines: " + ", ".join(headlines)
        return "Failed to retrieve news."
    except Exception as e:
        return str(e)


In [None]:

# Exploit Sources (cached)
@lru_cache(maxsize=100)
def exploitdb_lookup(query):
    log_query(query, "security")
    try:
        response = requests.get(f"https://www.exploit-db.com/search?q={query}")
        soup = BeautifulSoup(response.text, "html.parser")
        rows = soup.select("table tbody tr")
        results = []
        for row in rows[:5]:
            cols = row.find_all("td")
            if len(cols) >= 7:
                title = cols[2].text.strip()
                date = cols[1].text.strip()
                platform = cols[3].text.strip()
                link = "https://www.exploit-db.com" + cols[2].a["href"]
                results.append(f"• {title} ({platform}, {date})\n🔗 {link}")
        return "\n".join(results) if results else "❌ No results from ExploitDB."
    except Exception as e:
        return f"❌ Error searching ExploitDB: {str(e)}"

def nvd_cve_lookup(query):
    log_query(query, "security")
    try:
        base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
        response = requests.get(base_url, params={"keywordSearch": query})
        data = response.json()
        if "vulnerabilities" not in data:
            return "❌ No CVEs found."
        results = []
        for item in data["vulnerabilities"][:5]:
            cve_id = item["cve"]["id"]
            desc = item["cve"]["descriptions"][0]["value"]
            results.append(f"🔐 {cve_id}: {desc}")
        return "\n".join(results)
    except Exception as e:
        return f"❌ Error querying NVD: {str(e)}"

def github_poc_lookup(query):
    log_query(query, "security")
    try:
        headers = {"Accept": "application/vnd.github.v3+json"}
        params = {"q": f"{query} exploit", "sort": "updated"}
        response = requests.get("https://api.github.com/search/repositories", headers=headers, params=params)
        data = response.json()
        if "items" not in data:
            return "❌ No GitHub PoCs found."
        results = [f"💻 {repo['full_name']}\n🔗 {repo['html_url']}" for repo in data["items"][:5]]
        return "\n".join(results)
    except Exception as e:
        return f"❌ Error querying GitHub: {str(e)}"

def exploit_lookup(query: str) -> str:
    data = (
        "🗂️ ExploitDB:\n" + exploitdb_lookup(query) + "\n\n" +
        "📂 NVD CVEs:\n" + nvd_cve_lookup(query) + "\n\n" +
        "📁 GitHub PoCs:\n" + github_poc_lookup(query)
    )
    summary = llm(f"Summarize this exploit information: {data}")
    return f"{data}\n\n🧠 Summary:\n{summary}"

def whois(domain: str) -> str:
    log_query(domain, "security")
    try:
        api_key = ""  # Add key
        headers = {"X-Api-Key": api_key}
        response = requests.get(f"https://api.api-ninjas.com/v1/whois?domain={domain}", headers=headers)
        if response.status_code == 200:
            data = response.json()
            return f"Registrar: {data.get('registrar', 'N/A')}\nCreated: {data.get('creation_date', 'N/A')}"
        return "WHOIS failed."
    except Exception as e:
        return f"Error: {str(e)}"

def vuln_scan(script: str) -> str:
    log_query("vuln_scan", "security")
    try:
        # Placeholder for real analysis logic
        if "os.system" in script or "eval" in script:
            return "⚠️ Potential vulnerability detected: Dangerous function used."
        return "✅ No obvious vulnerabilities found."
    except Exception as e:
        return str(e)


In [25]:

# # # 🌐 Tool Functions
# # def search_wikipedia(query: str) -> str:
# #     try:
# #         results = wikipedia.search(query)
# #         if results:
# #             page = wikipedia.page(results[0])
# #             print("Result from wikipedia:", page.title, page.url)
# #             return page.content
# #         else:
# #             return "No results found."
# #     except Exception as e:
# #         return str(e)

# # def calculator(query: str) -> str:
# #     try:
# #         response = requests.get(f"http://api.mathjs.org/v4/?expr={query}")
# #         return response.text
# #     except Exception as e:
# #         return str(e)

# # def get_weather(city: str) -> str:
# #     try:
# #         api_key = "facc798a0fa6f5a38edbd6c9680aaa17"  # Replace with your real key
# #         response = requests.get(f"http://api.openweathermap.org/data/2.5/weather?q={city}&appid={api_key}")
# #         data = response.json()
# #         if response.status_code == 200:
# #             weather = data["weather"][0]["description"]
# #             print( "Weather data:", data)
# #             return f"The current weather in {city} is: {weather}"
# #         else:
# #             return "City not found."
# #     except Exception as e:
# #         return str(e)

# # def define_word(word: str) -> str:
# #     try:
# #         response = requests.get(f"https://api.dictionaryapi.dev/api/v2/entries/en/{word}")
# #         data = response.json()
# #         if isinstance(data, list) and len(data) > 0:
# #             meanings = data[0]["meanings"]
# #             definitions = [meaning["definitions"][0]["definition"] for meaning in meanings]
# #             print("Definitions data:", data)
# #             return f"Definitions for {word}: " + ", ".join(definitions)
# #         else:
# #             return "Word not found."
# #     except Exception as e:
# #         return str(e)

# # def latest_news(_:str) -> str:
# #     try:
# #         response = requests.get("https://newsapi.org/v2/top-headlines?country=us&apiKey=4944605bd4df4ae0b97cdb4af276a71d")
# #         data = response.json()
# #         if response.status_code == 200:
# #             articles = data["articles"]
# #             headlines = [article["title"] for article in articles]
# #             print("Latest news data:", data)
# #             return "Latest news headlines: " + ", ".join(headlines)
# #         else:
# #             return "Failed to retrieve news."
# #     except Exception as e:
# #         return str(e)



# def exploit_lookup(query: str) -> str:
#     """Look up public exploits from ExploitDB based on a keyword."""
#     try:
#         response = requests.get(f"https://exploitdb.rest/api/v1/search?q={query}")
#         if response.status_code == 200:
#             data = response.json()
#             if data.get("data"):
#                 exploits = data["data"][:5]
#                 return "\n".join(
#                     f"{item['id']}: {item['description']} ({item['type']})"
#                     for item in exploits
#                 )
#             return "No exploits found for that query."
#         return "Failed to fetch exploits."
#     except Exception as e:
#         return f"Error: {str(e)}"


# # def whois(domain: str) -> str:
# #     """Fetch WHOIS info for a domain using API Ninjas."""
# #     try:
# #         api_key = "GKG11PTGOoRfJPcClASlbQ==99xcq8tb32p33vqu"  # ← Replace with your actual API key
# #         headers = {"X-Api-Key": api_key}
# #         response = requests.get(f"https://api.api-ninjas.com/v1/whois?domain={domain}", headers=headers)
# #         if response.status_code == 200:
# #             data = response.json()
# #             return f"Registrar: {data.get('registrar', 'N/A')}\nCreated: {data.get('creation_date', 'N/A')}\nUpdated: {data.get('updated_date', 'N/A')}"
# #         return "Domain not found or WHOIS lookup failed."
# #     except Exception as e:
# #         return f"Error: {str(e)}"


# def exploitdb_lookup(query):
#     try:
#         response = requests.get(f"https://www.exploit-db.com/search?q={query}")
#         soup = BeautifulSoup(response.text, "html.parser")
#         rows = soup.select("table tbody tr")
#         results = []
#         for row in rows[:5]:  # Limit to 5 results
#             cols = row.find_all("td")
#             if len(cols) >= 7:
#                 title = cols[2].text.strip()
#                 date = cols[1].text.strip()
#                 platform = cols[3].text.strip()
#                 link = "https://www.exploit-db.com" + cols[2].a["href"]
#                 results.append(f"• {title} ({platform}, {date})\n🔗 {link}")
#         return "\n".join(results) if results else "❌ No results from ExploitDB."
#     except Exception as e:
#         return f"❌ Error searching ExploitDB: {str(e)}"

# def nvd_cve_lookup(query):
#     try:
#         base_url = "https://services.nvd.nist.gov/rest/json/cves/2.0"
#         response = requests.get(base_url, params={"keywordSearch": query})
#         data = response.json()
#         if "vulnerabilities" not in data:
#             return "❌ No CVEs found."
#         results = []
#         for item in data["vulnerabilities"][:5]:
#             cve_id = item["cve"]["id"]
#             desc = item["cve"]["descriptions"][0]["value"]
#             results.append(f"🔐 {cve_id}: {desc}")
#         return "\n".join(results)
#     except Exception as e:
#         return f"❌ Error querying NVD: {str(e)}"

# def github_poc_lookup(query):
#     try:
#         headers = {"Accept": "application/vnd.github.v3+json"}
#         params = {"q": f"{query} exploit", "sort": "updated"}
#         response = requests.get("https://api.github.com/search/repositories", headers=headers, params=params)
#         data = response.json()
#         if "items" not in data:
#             return "❌ No GitHub PoCs found."
#         results = []
#         for repo in data["items"][:5]:
#             results.append(f"💻 {repo['full_name']}\n🔗 {repo['html_url']}")
#         return "\n".join(results)
#     except Exception as e:
#         return f"❌ Error querying GitHub: {str(e)}"

# def exploit_lookup(query: str) -> str:
#     """Query multiple sources for exploit info."""
#     response = "🔍 Searching for: " + query + "\n\n"
#     response += "🗂️ ExploitDB Results:\n" + exploitdb_lookup(query) + "\n\n"
#     response += "📂 NVD CVE Results:\n" + nvd_cve_lookup(query) + "\n\n"
#     response += "📁 GitHub PoCs:\n" + github_poc_lookup(query) + "\n\n"
#     return response

In [26]:
# LangChain Tool Integration
tools = [
    Tool(name="Wikipedia Search", func=search_wikipedia, description="Search Wikipedia for a given query."),
    Tool(name="Calculator", func=calculator, description="Perform a simple calculation."),
    Tool(name="Weather", func=get_weather, description="Get the current weather for a given city."),
    Tool(name="Define Word", func=define_word, description="Get the definition of a word."),
    Tool(name="Latest News", func=latest_news, description="Get the latest news headlines."),
    Tool(name="Exploit Lookup", func=exploit_lookup, description="Search for public exploits across EDB, CVE, GitHub."),
    Tool(name="WHOIS", func=whois, description="Look up domain WHOIS information."),
    Tool(name="Vulnerability Scan", func=vuln_scan, description="Scan code snippet for common vulnerabilities.")
]


In [27]:
agent = initialize_agent(
    tools=tools,
    llm=llm,
    agent_type=AgentType.ZERO_SHOT_REACT_DESCRIPTION,
    handle_parse_errors=True,
    verbose=True,
    agent_kwargs={
        "system_message": (
            "You are a cybersecurity research assistant. When tools are available, use them.If question is not related to cybersecurity, use Wikipedia or other tools. If no tools are available, answer the question directly. "
        )
    }
)


In [28]:




# 🧠 Ask Agent (no retry or fallback needed anymore)
import time

def ask_agent(query: str) -> str:
    """Process the user's query using the agent, with fallback and rate-limit retry."""
    retries = 3  # number of retry attempts for rate limit
    for attempt in range(retries):
        try:
            response = agent.invoke(query)

            # Handle both dict and direct string response types
            if isinstance(response, dict) and "output" in response:
                return response["output"]
            return str(response)

        except Exception as e:
            error_msg = str(e)

            # 1. Handle Output Parsing Failures (casual questions)
            if "Could not parse LLM output" in error_msg:
                try:
                    return llm(query)  # fallback to raw LLM response
                except Exception as llm_error:
                    return f"Fallback failed: {str(llm_error)}"

            # 2. Handle Rate Limits
            if "rate limit" in error_msg.lower() and attempt < retries - 1:
                wait_time = 10 * (attempt + 1)
                print(f"[Retry {attempt+1}] Rate limit hit. Retrying in {wait_time}s...")
                time.sleep(wait_time)
                continue  # try again

            # 3. Return any other error
            return f"Error: {error_msg}"

    return "Error: Too many retries or unknown failure."


In [29]:
# 🎛️ Gradio UI
gr.Interface(
    fn=ask_agent,
    inputs=gr.Textbox(label="Ask a question or give a command"),
    outputs=gr.Textbox(label="Response from Agent"),
    title="LangChain Agent with Gradio + Local LLaMA.cpp Model",
    description="Cybersecurity-focused LangChain agent with Wikipedia, Exploit, CVE, WHOIS tools, and local LLM."
).launch(share=True, debug=True)

* Running on local URL:  http://127.0.0.1:7860
* Running on public URL: https://6db427e72bb2bf924c.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)




[1m> Entering new AgentExecutor chain...[0m


llama_perf_context_print:        load time =   58807.32 ms
llama_perf_context_print: prompt eval time =   58806.95 ms /   343 tokens (  171.45 ms per token,     5.83 tokens per second)
llama_perf_context_print:        eval time =   18340.28 ms /    44 runs   (  416.82 ms per token,     2.40 tokens per second)
llama_perf_context_print:       total time =   77214.70 ms /   387 tokens
Llama.generate: 387 prefix-match hit, remaining 13 prompt tokens to eval


[32;1m[1;3m I need to provide a script, but I shouldn't provide something that would actually hack a mobile phone
Action: Vulnerability Scan
Action Input: "script to hack mobile phone"[0m
Observation: [33;1m[1;3m✅ No obvious vulnerabilities found.[0m
Thought:

llama_perf_context_print:        load time =   58807.32 ms
llama_perf_context_print: prompt eval time =    2477.71 ms /    13 tokens (  190.59 ms per token,     5.25 tokens per second)
llama_perf_context_print:        eval time =   14140.05 ms /    33 runs   (  428.49 ms per token,     2.33 tokens per second)
llama_perf_context_print:       total time =   16669.33 ms /    46 tokens


[32;1m[1;3m I should provide a script that can be used to learn more about mobile phone security
Action: Define Word
Action Input: "mobile phone security"[0m
Observation: [36;1m[1;3mWord not found.[0m
Thought:

Llama.generate: 433 prefix-match hit, remaining 9 prompt tokens to eval
llama_perf_context_print:        load time =   58807.32 ms
llama_perf_context_print: prompt eval time =    1607.08 ms /     9 tokens (  178.56 ms per token,     5.60 tokens per second)
llama_perf_context_print:        eval time =   13072.77 ms /    31 runs   (  421.70 ms per token,     2.37 tokens per second)
llama_perf_context_print:       total time =   14726.67 ms /    40 tokens


[32;1m[1;3m I should use the latest news to provide information about mobile phone security
Action: Latest News
Action Input: "mobile phone security"[0m
Observation: [33;1m[1;3mLatest news headlines: Dow futures fall 500 points after Iran is targeted: Live updates - CNBC, Scale AI confirms ‘significant’ investment from Meta, says CEO Alexanr Wang is leaving - TechCrunch, Global oil prices soar after Israel attacks Iran - BBC, Draisaitl repeats OT heroics with record-breaking goal to even Stanley Cup Final for Oilers - NHL.com, Appeals court delays order that would have blocked Trump from continuing to deploy National Guard in California - ABC News, 2025 NBA Finals: 4 things to watch for in crucial Game 4 of Finals - NBA, The FIFA Club World Cup is here. Here's what you need to know - NPR, Speculation about the cause of Air India crash is rife. An aviation expert explains why it’s a problem - The Conversation, Diddy trial day 26 recap: Combs' ex-girlfriend ends marathon testimony -

Llama.generate: 473 prefix-match hit, remaining 456 prompt tokens to eval
llama_perf_context_print:        load time =   58807.32 ms
llama_perf_context_print: prompt eval time =   82265.13 ms /   456 tokens (  180.41 ms per token,     5.54 tokens per second)
llama_perf_context_print:        eval time =   14282.10 ms /    33 runs   (  432.79 ms per token,     2.31 tokens per second)
llama_perf_context_print:       total time =   96611.71 ms /   489 tokens


[32;1m[1;3m This is not helpful. I should provide some more technical information about mobile phone security
Action: Wikipedia Search
Action Input: "mobile phone security"[0m
Observation: [36;1m[1;3mMobile security, or mobile device security, is the protection of smartphones, tablets, and laptops from threats associated with wireless computing. It has become increasingly important in mobile computing. The security of personal and business information now stored on smartphones is of particular concern.
Increasingly, users and businesses use smartphones not only to communicate, but also to plan and organize their work and private life. Within companies, these technologies are causing profound changes in the organization of information systems and have therefore become the source of new risks. Indeed, smartphones collect and compile an increasing amount of sensitive information to which access must be controlled to protect the privacy of the user and the intellectual property of the

