# 🛡️ Cybersecurity QA Agent

This project is a CLI-based question-answering system focused on **cybersecurity topics**.  
It uses **LangChain**, **OpenAI models**, and **FAISS vector search** to retrieve and generate answers from transcript files.

### Features:
- Loads transcript `.txt` files and builds a vector database.
- Filters user questions using keyword and LLM-based classifiers.
- Responds using a ReAct agent with memory.
- Supports both **text** and **voice** input/output.
- Logs all interactions to a CSV file for review.

> Perfect for educational or assistant use in cybersecurity domains.


## 1. Import Required Libraries

In [1]:
import os
from dotenv import load_dotenv
from langchain.document_loaders import TextLoader
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.vectorstores import FAISS
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain_core.messages import HumanMessage
from langchain_core.chat_history import InMemoryChatMessageHistory
from langchain_core.runnables.history import RunnableWithMessageHistory
from langchain.agents import tool
from langgraph.prebuilt import create_react_agent
from langchain.prompts import PromptTemplate
from openai import OpenAI
import speech_recognition as sr
from elevenlabs.client import ElevenLabs
from elevenlabs import play
from csv import DictWriter

## 2. Load Environment Variables and Initialize Clients

In [2]:
# ========== Load env and clients ==========
message_history = InMemoryChatMessageHistory()
load_dotenv()
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0.5)
client = OpenAI(api_key=OPENAI_API_KEY)
client_voice = ElevenLabs(api_key=os.getenv("ELEVENLABS_API_KEY"))


## 3. Build Vectorstore from Transcript Files

In [3]:
# ========== Build vectorstore ==========
def build_vectorstore_from_transcripts(folder_path, persist_path, openai_api_key):
    docs = []
    for filename in os.listdir(folder_path):
        if filename.endswith(".txt"):
            loader = TextLoader(os.path.join(folder_path, filename), encoding="utf-8")
            docs.extend(loader.load())

    splitter = RecursiveCharacterTextSplitter(chunk_size=300, chunk_overlap=50)
    split_docs = splitter.split_documents(docs)

    embeddings = OpenAIEmbeddings(openai_api_key=openai_api_key)
    vectorstore = FAISS.from_documents(split_docs, embeddings)
    vectorstore.save_local(persist_path)
    return vectorstore


## 4. Filtering and Answering Questions

In [None]:
import re
from langchain.prompts import PromptTemplate

# Fast keyword-based filter for quick initial screening
CYBER_KEYWORDS = [
    "cyber", "security", "phishing", "malware", "encryption", "authentication", "authorization",
    "firewall", "password", "hacking", "threat", "breach", "hashing", "vpn"
]

# ✅ Basic keyword filter
def is_likely_cyber_keywords(text):
    text = text.lower()
    return any(re.search(rf"\b{word}\b", text) for word in CYBER_KEYWORDS)

# ✅ Smart LLM-based filter
def is_likely_cyber_by_llm(query):
    prompt = PromptTemplate.from_template("""
    You are a strict cybersecurity content classifier.

    Decide whether the following question is strictly about cybersecurity (e.g., threats, encryption, phishing, authentication, etc).

    Question: {question}

    Answer with only "yes" or "no".
    """)
    chain = (
        {"question": lambda _: query}
        | prompt
        | llm
    )
    result = chain.invoke({}).content.strip().lower()
    return result.startswith("y")

def get_qa_tool_fn(vectorstore):
    def qa_fn(query, history=None):
        import os

        print("\n🔥 qa_fn called directly")

        # ✅ Retrieve the top 3 most similar chunks
        docs_with_scores = vectorstore.similarity_search_with_score(query, k=3)

        similarity_score = 0.0
        context_text = ""

        if docs_with_scores:
            doc0, distance = docs_with_scores[0]
            similarity_score = 1 - distance
            context_text = "\n\n".join(doc.page_content for doc, _ in docs_with_scores)

            print("\n🗍 Combined Top 3 Chunks:")
            print(context_text[:500])
        else:
            print("⚠️ No chunks found.")

        print(f"\n📊 Similarity Score: {round(similarity_score, 2)}")

        # ✅ Smart filter combining both methods
        is_cyber = is_likely_cyber_keywords(query) or is_likely_cyber_by_llm(query)
        print(f"🔍 is_likely_cyber: {is_cyber}")

        answer = ""
        contexts_used = ""

        # ✅ CASE 1: High similarity match
        if similarity_score > 0.65:
            prompt = PromptTemplate.from_template("""
            You are a cybersecurity tutor. ONLY answer based on the following:

            Context:
            {context}

            Question:
            {question}
            """)
            chain = (
                {"context": lambda _: context_text, "question": lambda _: query}
                | prompt
                | llm
            )
            answer = chain.invoke({}).content
            contexts_used = context_text  

        # ✅ CASE 2: Medium similarity + relevant to cybersecurity
        elif similarity_score > 0.3 and is_cyber:
            prompt = PromptTemplate.from_template("""
            You are a cybersecurity tutor. This question is OUTSIDE the prepared playlist, but it's still related to cybersecurity.
            Answer clearly and concisely.

            Question:
            {question}
            """)
            chain = (
                {"question": lambda _: query}
                | prompt
                | llm
            )
            answer = chain.invoke({}).content
            contexts_used = "N/A (cyber-related, no strong match)"

        # ❌ CASE 3: Completely out of scope
        else:
            answer = "❌ This question is outside the scope of the cybersecurity playlist."
            contexts_used = "N/A"

        # ✅ Log the interaction in a CSV file
        try:
            # Save the context only if it was actually used
            csv_data = {
                "query": query,
                "answer": answer,
                "contexts": (
                    "" if contexts_used in ["", "N/A", "N/A (cyber-related, no strong match)"]
                    else contexts_used.replace("\n", " ")
                )

            }
            csv_file = "rag_log.csv"
            file_exists = os.path.isfile(csv_file)

            with open(csv_file, mode="a", encoding="utf-8", newline="") as f:
                writer = DictWriter(f, fieldnames=["query", "answer", "contexts"])
                if not file_exists:
                    writer.writeheader()
                writer.writerow(csv_data)

            print("📁 Saved to rag_log.csv ✅")
        except Exception as e:
            print("❌ Failed to save CSV:", str(e))

        return answer

    return qa_fn


## 5. Create Cybersecurity Agent with Memory

In [None]:
# ========== Agent Creation ==========
def create_agent_with_memory(vectorstore):
    qa_fn = get_qa_tool_fn(vectorstore)

    @tool
    def CyberSecurityQA_tool(input: str) -> str:
        """
        YOU MUST use this tool for all questions. 
        Do NOT answer directly.
        """
        print("\n🔥 CyberSecurityQA_tool was called")

        # Always pass the question to qa_fn even if it's out of scope
        return qa_fn(input)

    agent = create_react_agent(model=llm, tools=[CyberSecurityQA_tool])


    return RunnableWithMessageHistory(
        agent,
        lambda: message_history,
        input_messages_key="messages",
        history_messages_key="history",
    )



## 6. Summarize Agent Response

In [7]:
# ========== Summarize response ==========
def summarize_response(response):
    print("\n📟 Summary:")
    human_messages = [m for m in response['messages'] if m.type == "human"]
    if human_messages:
        print(f"🧠 User: {human_messages[-1].content}")

    tool_calls = [m for m in response['messages'] if hasattr(m, "tool_calls") and m.tool_calls]
    if tool_calls:
        print("🛠️ Tool Used:", tool_calls[-1].tool_calls[0]['name'])

    final_answer = [m for m in response['messages'] if m.type == "ai" and m.content.strip() != ""]
    if final_answer:
        text_voice = input("💬✅ put 1 for text and 2 for voice-output in answer part : ")
        if text_voice == "1":
            print("🤖 AI Response:", final_answer[-1].content.strip())
        elif text_voice == "2":
            audio = client_voice.text_to_speech.convert(
                text=final_answer[-1].content.strip(),
                voice_id="ThT5KcBeYPX3keUQqHPh",
                model_id="eleven_multilingual_v2",
                output_format="mp3_44100_128",
            )
            play(audio, use_ffmpeg=False)
            print("🎤 AI Response:", final_answer[-1].content.strip())

    print("\n" + "=" * 50 + "\n")


## 7. Run the CLI Chat Loop


In [8]:
# ========== CLI Loop ==========
def run_cli():
    vectorstore = FAISS.load_local(
        "vector_db",
        OpenAIEmbeddings(openai_api_key=OPENAI_API_KEY),
        allow_dangerous_deserialization=True
    )

    agent_with_history = create_agent_with_memory(vectorstore)

    print("\n🔐 Cybersecurity Agent Ready! Ask anything or request a quiz. Type 'exit' to quit.\n")

    while True:
        text_voice = input("⁉️ put 1 for text and 2 for voice-input in question part: ")
        if text_voice == "1":
            user_input = input("🧠 Your question: ")
        else:
            recognizer = sr.Recognizer()
            with sr.Microphone() as source:
                print("🎤 Speak your question...")
                recognizer.adjust_for_ambient_noise(source)
                audio = recognizer.listen(source)

                try:
                    user_input = recognizer.recognize_google(audio, language='en-US')
                    print("📝 You said:", user_input)
                except sr.UnknownValueError:
                    print("🤔 Could not understand the audio.")
                    continue
                except sr.RequestError:
                    print("❌ Could not request results from the speech recognition service.")
                    continue

        if user_input.lower() in ["exit", "quit"]:
            break

        previous_messages = message_history.messages[-5:]
        new_message = HumanMessage(content=user_input)
        all_messages = previous_messages + [new_message]

        response = agent_with_history.invoke({"messages": all_messages})
        summarize_response(response)

## 8. Entry Point


In [9]:
# ========== Run Main ==========
if not os.path.exists("vector_db"):
    build_vectorstore_from_transcripts("transcripts", "vector_db", OPENAI_API_KEY)

run_cli()


🔐 Cybersecurity Agent Ready! Ask anything or request a quiz. Type 'exit' to quit.


🔥 CyberSecurityQA_tool was called

🔥 qa_fn called directly

🗍 Combined Top 3 Chunks:
as well which is in general probably a good thing two factor authentication or two FA more generally known as multi factor authentication is a technology whereby in addition to having one factor that you use to log in like your password as is tradition you also have a second or maybe more factors

solutions especially for popular websites like Google and Facebook and others is that if I already have an account with Google or Facebook and hopefully I already have a good password for both of those

📊 Similarity Score: 0.7400000095367432
🔍 is_likely_cyber: True
📁 Saved to rag_log.csv ✅

📟 Summary:
🧠 User: How does two-factor authentication improve account security?
🛠️ Tool Used: CyberSecurityQA_tool
🤖 AI Response: Two-factor authentication (2FA) improves account security by requiring users to provide two distinct forms of

# 🙏 Thanks 💙