# Oveview

[Sequential Process](https://docs.crewai.com/core-concepts/Processes/#sequential-process)
* This method mirrors dynamic team workflows, progressing through tasks in a thoughtful and systematic manner. Task execution follows the predefined order in the task list, with the output of one task serving as context for the next.
* To customize task context, utilize the context parameter in the Task class to specify outputs that should be used as context for subsequent tasks.

Async is used for the Agents that search for similar CVEs from NVD and Top25 CWE Mappings.

In [1]:

    #vulnerability_description = "A buffer overflow vulnerability in the XYZ software allows remote attackers to execute arbitrary code."
    

    #vulnerability_description = "The Widget Connector macro in Atlassian Confluence Server before version 6.6.12 (the fixed version for 6.6.x), from version 6.7.0 before 6.12.3 (the fixed version for 6.12.x), from version 6.13.0 before 6.13.3 (the fixed version for 6.13.x), and from version 6.14.0 before 6.14.2 (the fixed version for 6.14.x), allows remote attackers to achieve path traversal and remote code execution on a Confluence Server or Data Center instance via server-side template injection."
    #vulnerability_description = "The Popup Manager WordPress plugin through 1.6.6 does not have authorisation and CSRF check when creating/updating popups, and is missing sanitisation as well as escaping, which could allow unauthenticated attackers to create arbitrary popups and add Stored XSS payloads as well"
    #vulnerability_description = "ProductX contains a default SSH public key in the authorized_keys file. A remote attacker could use this key to gain root privileges."
    #vulnerability_description = "Due to insufficient input validation, SAP Employee Self Service allows an authenticated attacker with user privileges to alter employee number. On successful exploitation, the attacker can view personal details of other users causing a limited impact on confidentiality of the application."
    ##vulnerability_description = "An out-of-bounds write vulnerability exists in the TIFF header count-processing functionality of Accusoft ImageGear 19.8. A specially crafted malformed file can lead to memory corruption. An attacker can provide a malicious file to trigger this vulnerability."
    #vulnerability_description = "A stack-based buffer overflow vulnerability exists in the Web Manager SslGenerateCSR functionality of Lantronix PremierWave 2050 8.9.0.0R4 (in QEMU). A specially crafted HTTP request can lead to remote code execution. An attacker can make an authenticated HTTP request to trigger this vulnerability."
    
    
    #vulnerability_description = "The Cisco Discovery Protocol implementation in Cisco IOS XR Software does not do improper validation of string input from certain fields which could allow an unauthenticated, adjacent attacker to execute arbitrary code or cause a reload on an affected device. The vulnerability is due to improper validation of string input from certain fields in Cisco Discovery Protocol messages. An attacker could exploit this vulnerability by sending a malicious Cisco Discovery Protocol packet to an affected device."
    #this is a good test case for CSV Semantic Search Tool because the top25 does not have this exact string " improper validation of string input"
     
    #vulnerability_description = "_functions.php in cpCommerce 1.2.x, possibly including 1.2.9, sends a redirect but does not exit when it is called directly, which allows remote attackers to bypass a protection mechanism to conduct remote file inclusion and directory traversal attacks, execute arbitrary PHP code, or read arbitrary files via the GLOBALS[prefix] parameter, a different vector than CVE-2003-1500."
    #This CVE has 5 impacts, and one ambiguous weakness
    
    #vulnerability_description = "Apache Log4j2 2.0-beta9 through 2.15.0 (excluding security releases 2.12.2, 2.12.3, and 2.3.1) JNDI features used in configuration, log messages, and parameters do not protect against attacker controlled LDAP and other JNDI related endpoints. An attacker who can control log messages or log message parameters can execute arbitrary code loaded from LDAP servers when message lookup substitution is enabled. From log4j 2.15.0, this behavior has been disabled by default. From version 2.16.0 (along with 2.12.2, 2.12.3, and 2.3.1), this functionality has been completely removed. Note that this vulnerability is specific to log4j-core and does not affect log4net, log4cxx, or other Apache Logging Services projects."
    #Log4shell
    
    vulnerability_description = "A heap-based buffer overflow vulnerability [CWE-122] in FortiOS SSL-VPN 7.2.0 through 7.2.2, 7.0.0 through 7.0.8, 6.4.0 through 6.4.10, 6.2.0 through 6.2.11, 6.0.15 and earlier and FortiProxy SSL-VPN 7.2.0 through 7.2.1, 7.0.7 and earlier may allow a remote unauthenticated attacker to execute arbitrary code or commands via specifically crafted requests."

In [2]:

from typing import List
import asyncio

from dotenv import dotenv_values
from dotenv import load_dotenv
import os
import requests
import json
from typing import List, Dict
import pandas as pd

In [3]:
# load .env file to environment
load_dotenv()

config = dotenv_values("../.env")

# Set up API keys (replace with your actual API keys)
os.environ["OPENAI_API_KEY"] = config['OPENAI_API_KEY']
os.environ["ANTHROPIC_API_KEY"] = config['ANTHROPIC_API_KEY']
LANGTRACE_API_KEY = config['LANGTRACE_API_KEY']

#os.environ["AGENTOPS_API_KEY"] = config['AGENTOPS_API_KEY']
#os.environ["GOOGLE_API_KEY"] = config['GOOGLE_API_KEY']
#os.environ["SERPER_API_KEY"] = config['SERPER_API_KEY']

#If using env vars
#OPENAI_API_KEY=config['OPENAI_API_KEY']
#ANTHROPIC_API_KEY=config['ANTHROPIC_API_KEY']
#GOOGLE_API_KEY=config['GOOGLE_API_KEY']



In [4]:
# Must precede any llm module imports

from langtrace_python_sdk import langtrace
from langtrace_python_sdk.utils.with_root_span import with_langtrace_root_span
langtrace.init(api_key = LANGTRACE_API_KEY)


[32mInitializing Langtrace SDK..[39m
[37m⭐ Leave our github a star to stay on top of our updates - https://github.com/Scale3-Labs/langtrace[39m


  from .autonotebook import tqdm as notebook_tqdm


In [5]:
OUTPUT_DIR = "./data_out/"


In [6]:


from crewai import Agent, Task, Crew, Process
from crewai_tools import JSONSearchTool
from crewai_tools import WebsiteSearchTool
#from crewai_tools import SerperDevTool
from crewai_tools import tool
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from crewai_tools import CSVSearchTool

[34mExporting spans to Langtrace cloud..[39m
[1m[95m [2024-09-12 10:43:10][DEBUG]: == Working Agent: Keyphrases Extractor[00m
[1m[95m [2024-09-12 10:43:10][INFO]: == Starting Task: Extract key phrases from the following vulnerability description: A heap-based buffer overflow vulnerability [CWE-122] in FortiOS SSL-VPN 7.2.0 through 7.2.2, 7.0.0 through 7.0.8, 6.4.0 through 6.4.10, 6.2.0 through 6.2.11, 6.0.15 and earlier and FortiProxy SSL-VPN 7.2.0 through 7.2.1, 7.0.7 and earlier may allow a remote unauthenticated attacker to execute arbitrary code or commands via specifically crafted requests. and provide the details in a structured format.[00m


[1m> Entering new CrewAgentExecutor chain...[0m
[32;1m[1;3mThought: I now can give a great answer

Final Answer:

[WEAKNESS]: Heap-based buffer overflow vulnerability [CWE-122]

[PRODUCT]: 
- FortiOS SSL-VPN
- FortiProxy SSL-VPN

[VERSION]:
FortiOS SSL-VPN:
- 7.2.0 through 7.2.2
- 7.0.0 through 7.0.8
- 6.4.0 through 6.4.10
- 6.2.

In [7]:
# Initialize the language model
#llm = ChatOpenAI(temperature=0)
llm = ChatAnthropic(model="claude-3-5-sonnet-20240620")

In [8]:
#==================================================
# Define the tools
#==================================================

# Configure WebsiteSearchTool to search NVD
#nvd_search_tool = WebsiteSearchTool(site_url="https://nvd.nist.gov/vuln/")
#nvd_search_tool = WebsiteSearchTool(site_url="https://nvd.nist.gov/vuln/search/results?form_type=Basic&results_type=overview&query")
#nvd_search_tool = WebsiteSearchTool(site_url="https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch=")

@tool
def nvd_search_tool(keyword: str, limit: int = 5) -> str:
    """
    Searches the NVD API for vulnerabilities related to the provided keyword.
    
    Args:
        keyword (str): The search keyword to query CVEs in NVD.

    Returns:
        str: A formatted string of CVE IDs and descriptions of vulnerabilities.
    """
    # NVD API endpoint
    api_url = f"https://services.nvd.nist.gov/rest/json/cves/2.0?keywordSearch={keyword}"
    
    try:
        # Step 2: Make the API request
        response = requests.get(api_url)
        response.raise_for_status()
        data = response.json()

        # Step 3: Parse the data and extract useful information
        cves = data.get("vulnerabilities", [])
        if not cves:
            return f"No vulnerabilities found for keyword '{keyword}'."

        cves = cves[:limit] # Limit the number of CVEs to the specified limit
        
        # Step 4: Format the response
        output = []
        for cve_item in cves:
            cve = cve_item.get("cve", {})
            cve_id = cve.get("id", "N/A")
            cve_description = cve.get("descriptions", [{}])[0].get("value", "No description available.")
            
            # Extract CWE IDs
            weaknesses = cve.get("weaknesses", [])
            cwe_ids = []
            for weakness in weaknesses:
                for description in weakness.get("description", []):
                    cwe_id = description.get("value", "")
                    if cwe_id and cwe_id not in cwe_ids:
                        cwe_ids.append(cwe_id)
            
            cwe_str = ", ".join(cwe_ids) if cwe_ids else "No CWE ID available"
            
            output.append(f"[CVE ID] {cve_id}\n[CWE ID] {cwe_str}\n[DESCRIPTION] {cve_description}\n")
        
        # Join the formatted CVE entries
        return "\n".join(output)

    except requests.exceptions.RequestException as e:
        return f"Error fetching data from NVD API: {e}"

def format_cve_entry(row: Dict) -> str:
    """
    Formats a single CVE entry from a DataFrame row.
    
    Args:
        row (Dict): A dictionary representing a row from the DataFrame.
    
    Returns:
        str: A formatted string with the CVE details.
    """
    return (
        f"[CVE] {row.get('CVE', 'N/A')}\n"
        f"[Description] {row.get('Description', 'N/A')}\n"
        f"[Chains] {row.get('Chains', 'N/A')}\n"
        f"[Weakness Description] {row.get('Weakness_Description', 'N/A')}"
    )


@tool
def top25_search_tool(weakness: str, limit: int = 10) -> str:
    """
    Searches the Description column in the specified CSV file for the given weakness string
    and returns related information from the matching rows.
    
    Args:
        filename (str): The path to the CSV file to search.
        weakness (str): The string to search for in the Description column.
        limit (int): The maximum number of results to return. Defaults to 10.
    
    Returns:
        str: A formatted string with CVE details for up to 'limit' matches, or a message if not found.
    """
    filename = './data_in/100_top25-mitre-mapping-analysis-2023-public_with_cve_descriptions.csv'
    try:
        csv_data = pd.read_csv(filename)
    except Exception as e:
        return f"Error loading CSV file '{filename}': {e}"
    
    # Search for the weakness in the Description column
    matching_entries = csv_data[csv_data['Weakness_Description'].str.contains(weakness, case=False, na=False)]
    
    if matching_entries.empty:
        return f"No information found for weakness: {weakness} in file: {filename}."
    
    # Limit the results to the specified number of entries
    limited_entries = matching_entries.head(limit)
    
    # Format the output for each matching row
    output = []
    for _, row in limited_entries.iterrows():
        formatted_entry = format_cve_entry(row)
        output.append(formatted_entry)
    
    # Join all formatted entries
    return "\n\n".join(output)





In [9]:
#By default, the tool uses OpenAI for both embeddings and summarization. To customize the model, you can use a config dictionary as follows:
#https://docs.crewai.com/tools/CSVSearchTool/#installation

# Initialize the tool with a specific CSV file. This setup allows the agent to only search the given CSV file.
top25_semantic_search_tool = CSVSearchTool(csv='./data_in/100_top25-mitre-mapping-analysis-2023-public_with_cve_descriptions.csv')
#tool = CSVSearchTool(csv='./data_in/top25-mitre-mapping-analysis-2023-public_with_cve_descriptions.csv')

In [10]:
#==================================================
# Prompts
#==================================================

# Function to read the system prompt from a file
def read_system_prompt(file_path: str) -> str:
    with open(file_path, 'r') as file:
        return file.read()


keyphrases_extractor_prompt = read_system_prompt('./prompts/extract_key_entities/system.md')
assign_cwe_prompt = read_system_prompt('./prompts/assign_cwes/system.md')
create_report_prompt = read_system_prompt('./prompts/create_report/system.md')
create_report_output_prompt = read_system_prompt('./prompts/create_report/output.md')



In [11]:

#==================================================
# Define the agents
#==================================================

keyphrases_extractor = Agent(
    role='Keyphrases Extractor',
    goal='Extract key phrases from vulnerability descriptions',
    backstory='You are an expert in cybersecurity language and can identify crucial elements in vulnerability descriptions.',
    allow_delegation=False,
    llm=llm,
    max_iter=3,
    verbose=True,  # Enable verbose mode
    memory=True,   # Enable memory if needed
    system_prompt=keyphrases_extractor_prompt
)

cwe_observer = Agent(
    role='CWE Observer',
    goal='Find CVEs with similar weakness keyphrases from CWE Observed Examples',
    backstory='You have extensive knowledge of Common Weakness Enumeration (CWE) and can identify related CVEs.',
    allow_delegation=False,
    llm=llm,
    max_iter=3,
    verbose=True,  # Enable verbose mode
    memory=True,   # Enable memory if needed
)

top25_cwe_analyst = Agent(
    role='Weakness Resarcher',
    goal='For each [ROOTCAUSE] and [WEAKNESS] keyphrase, search the top25 for similar [ROOTCAUSE] [WEAKNESS] keyphrases.',
    backstory='You are an expert in navigating the top25 and can find pertinent CVEs with similar [ROOTCAUSE] [WEAKNESS] keyphrases.',
    system_prompt="""You are an expert in navigating the top25 and can find pertinent CVEs with similar [ROOTCAUSE] [WEAKNESS] keyphrases.
    You will be provided with a list of [ROOTCAUSE] and [WEAKNESS] keyphrases and you need to search the CSV source for CVEs with similar [ROOTCAUSE] and [WEAKNESS] keyphrases.
    Results from the top25_search_tool are preferred, but the top25_semantic_search_tool is also acceptable.
    Maximize the number of results you return, but do not return more than 10 results.
    Provide the raw output from the top25_search_tool. 
    Provide the full rows for matching content as raw output from the top25_semantic_search_tool.
    
    """,
    allow_delegation=False,
    llm=llm,
    verbose=True,  # Enable verbose mode
    memory=True,   # Enable memory if needed
    max_iter=3,
    tools=[top25_search_tool, top25_semantic_search_tool],
)

# Rely on LLM training data instead of a search via NVD API
nvd_researcher = Agent(
    role='NVD Researcher',
    goal='For each [ROOTCAUSE] and [WEAKNESS] keyphrase, search the National Vulnerability Database for CVEs with similar [ROOTCAUSE] and [WEAKNESS] keyphrases.',
    backstory='You are an expert in navigating the National Vulnerability Database and can find pertinent CVEs with similar [ROOTCAUSE] and [WEAKNESS] keyphrases.',
    system_prompt="""You are an expert in navigating the National Vulnerability Database and can find pertinent CVEs with similar [ROOTCAUSE] and [WEAKNESS] keyphrases.
    You will be provided with a list of [ROOTCAUSE] and [WEAKNESS] keyphrases and you need to search the National Vulnerability Database for CVEs with similar [ROOTCAUSE] and [WEAKNESS] keyphrases.
    It does not matter if the [ROOTCAUSE] and [WEAKNESS] keyphrases are not exactly the same, as long as they are similar.
    It does not matter about the other keyphrases, they are not relevant.
    Make sure you only return CVEs that have a CWE-ID that starts with "CWE-". Ignore any CVEs that do not have a CWE-ID that starts with "CWE-" e.g. "NVD-CWE-Other", "NVD-CWE-Insufficient-Info"
    You will need to provide a list of CVE Descriptions and their CVE-IDs amd CWE-IDs that are similar to the [ROOTCAUSE] and [WEAKNESS] keyphrases and their CWE-IDs. 
    Provide the raw output from the nvd_search_tool. 
    """,
    allow_delegation=False,
    llm=llm,
    max_iter=3,
    verbose=True,
    memory=True,
    tools=[nvd_search_tool],  # Assign the custom WebsiteSearchTool to the agent
    )

report_creator = Agent(
    role='Report Creator',
    goal='Assemble the information provided to create a vulnerability report',
    backstory='You are skilled in organizing information from various sources to create a vulnerability report.',
    allow_delegation=False,
    llm=llm,
    max_iter=3,
    system_prompt=create_report_prompt, 
    verbose=True,  # Enable verbose mode
    memory=True,   # Enable memory if needed

)

report_reviewer = Agent(
    role='Report Reviewer',
    goal='Review and improve the vulnerability report',
    backstory='You have years of experience in reviewing and enhancing cybersecurity reports, ensuring they are accurate',
    allow_delegation=False,
    llm=llm,
    max_iter=3,
    verbose=True,  # Enable verbose mode
    memory=True,   # Enable memory if needed
)




#==================================================
# Define the tasks
#==================================================     
# TODO vulnerability_description needs to be passed alone - not pre-defined 
extract_key_phrases = Task(
    description=f"Extract key phrases from the following vulnerability description: {vulnerability_description} and provide the details in a structured format.",
    expected_output="The output should contain: [WEAKNESS], [PRODUCT], [VERSION], [ATTACKER], [IMPACT], [VECTOR], [ROOTCAUSE].",
    output_file=os.path.join(OUTPUT_DIR, "key_phrases.txt"),
    agent=keyphrases_extractor,
)

find_similar_cves_from_cwes_observed = Task(
    description="Find CVEs with similar weakness keyphrases from CWE Observed Examples using the extracted key phrases.",
    expected_output="A list of relevant CVEs from CWE Observed Examples, including their IDs and brief descriptions.",
    #async_execution=True,
    output_file=os.path.join(OUTPUT_DIR, "similar_cves_from_cwes_observed.txt"),
    agent=cwe_observer,
    context=[extract_key_phrases]
)

find_similar_cves_from_top25 = Task(
    description="Identify CVEs with similar weakness keyphrases from Top 25 CWE Mappings using the extracted key phrases.",
    expected_output="The raw output from the top25_cwe_analyst.",
    #async_execution=True,
    output_file=os.path.join(OUTPUT_DIR, "similar_cves_from_top25.txt"),
    agent=top25_cwe_analyst,
    context=[extract_key_phrases]
)

find_similar_cves_from_nvd = Task(
    description="Discover CVEs with similar weakness keyphrases from the National Vulnerability Database using the extracted key phrases.",
    expected_output="The raw output from the NVD search tool.",
    async_execution=True,
    output_file=os.path.join(OUTPUT_DIR, "similar_cves_from_nvd.txt"),
    agent=nvd_researcher,
    context=[extract_key_phrases]
)

create_vulnerability_report = Task(
    description=f"Create a report based on the vulnerability description: {vulnerability_description}, extracted key phrases, similar CVEs from NVD, similar CVEs from Top25.",
    #description="Create a comprehensive report based on the vulnerability description, extracted key phrases, and identified CVEs from all sources.",
    expected_output=create_report_output_prompt,
    output_file=os.path.join(OUTPUT_DIR, "vulnerability_report_draft.md"),
    agent=report_creator,
    context=[extract_key_phrases, find_similar_cves_from_nvd, find_similar_cves_from_top25]
    #context=[find_similar_cves_from_cwes_observed, find_similar_cves_from_top25, find_similar_cves_from_nvd]
)

review_vulnerability_report = Task(
    description="Review the created report, provide feedback, and improve it if necessary.",
    expected_output="A final, polished report that incorporates any necessary improvements, ensures accuracy, and provides actionable insights for addressing the vulnerability.",
    output_file=os.path.join(OUTPUT_DIR, "vulnerability_report_final.md"),
    agent=report_reviewer,
)



In [None]:
# Main function to run the vulnerability analysis
@with_langtrace_root_span()
def analyze_vulnerability(vulnerability_description: str) -> str:
    vulnerability_analysis_crew = Crew(
        #agents=[keyphrases_extractor, cwe_observer, top25_cwe_analyst, nvd_researcher, report_creator, report_reviewer],
        #tasks=[extract_key_phrases, find_similar_cves_from_cwes_observed, find_similar_cves_from_top25, find_similar_cves_from_nvd, create_vulnerability_report,review_vulnerability_report],
        agents=[keyphrases_extractor, nvd_researcher, top25_cwe_analyst, report_creator],
        tasks=[extract_key_phrases, find_similar_cves_from_nvd, find_similar_cves_from_top25, create_vulnerability_report],
        verbose=True,
        process=Process.sequential,
        output_log_file=os.path.join(OUTPUT_DIR, "output.log")
    )
    result = vulnerability_analysis_crew.kickoff()
    return result

# Example usage
if __name__ == "__main__":


    final_report = analyze_vulnerability(vulnerability_description)
    print("Final Vulnerability Analysis Report:")
    print(final_report)
    