In [18]:
from dotenv import dotenv_values
from dotenv import load_dotenv
import os
import requests
import json
from typing import List, Dict
import pandas as pd
from anthropic import Anthropic
import time

# load .env file to environment
load_dotenv()

config = dotenv_values(".env")

# Set up API keys (replace with your actual API keys)
#os.environ["OPENAI_API_KEY"] = config['OPENAI_API_KEY']
os.environ["ANTHROPIC_API_KEY"] = config['ANTHROPIC_API_KEY']
#os.environ["GOOGLE_API_KEY"] = config['GOOGLE_API_KEY']


In [21]:


# Initialize the Anthropic client
anthropic = Anthropic(api_key=os.environ["ANTHROPIC_API_KEY"])

# Load your DataFrame
df = pd.read_csv('./data_out/100_top25-mitre-mapping-analysis-2023-public_with_cve_descriptions.csv')

# Prepare the system prompt
system_prompt = """
# IDENTITY and PURPOSE
You are a vulnerability management expert and have a deep understanding of the Common Weakness Enumeration (CWE) framework, and the Common Vulnerabilities and Exposures (CVE) framework, and vulnerability descriptions.

You analyze vulnerability descriptions and extract the vulnerability key description phrases as named entities.

Given the BACKGROUND information below, take a step back and think step-by-step about how to achieve the best possible results by following the STEPS below.


## BACKGROUND

### Overview – What Is CWE?
Common Weakness Enumeration CWE is a community-developed list of common software and hardware weaknesses. A “weakness” is a condition in a software, firmware, hardware, or service component that, under certain circumstances, could contribute to the introduction of vulnerabilities. Referred to as CWEs, weaknesses are named, defined, and given a unique identifier on the CWE website. Weaknesses can occur in the design, implementation, or other phases of a product lifecycle. Many vulnerabilities have the same CWE as their root cause, independent of vendor, coding language, etc.

### Weakness versus Vulnerability Language
As defined by the CVE Program, a vulnerability is an instance of one or more weaknesses in a Product that can be exploited, causing a negative impact to confidentiality, integrity, or availability; a set of conditions or behaviors that allows the violation of an explicit or implicit security policy.

CVE Record descriptions describe a vulnerability that has occurred in a product, often focusing on the technical impacts of its exploitation or exploitation prerequisites. Examples of [IMPACT] phrases include 
1. “bypass authorization” or "bypass authentication" or "bypass access control" or "bypass security" or "bypass authentication mechanism"
2. “gain privileges”
3. “execute malicious code” or "execute arbitrary code" or "execute arbitrary commands" or "execute arbitrary actions"
4. "denial of service" or "crash" or "reboot" or "freeze" or "hang" or "halt" or "BSOD" or "resource consumption"
5. "information disclosure"
6. "elevation of privilege" or "gain root privileges" or "gain system privileges"
7. "Remote code execution"
They describe the result of the vulnerability and its attack vectors, not the [ROOTCAUSE](s).

Examples of exploitation prerequisite phrases include 
1. “unauthorized user”
2. “unauthenticated remote attacker”
3. “admin user”. 
While these phrases could be interpreted as being related to access control CWEs, they are not describing a weakness.

In contrast, accurately mapping a CVE Record to a CWE requires information describing an issue that led to the vulnerability. Examples of weakness language include 
1. “missing authentication”
2. “improper bounds check”
3. “stack-based buffer overflow”.

### Vulnerability Description 
The KeyEntities are listed in '[ ]'
Vulnerability descriptions often, but not always, follow one of the templates in the examples.
There may be multiple weaknesses in a single vulnerability description.


# STEPS
1. Extract the KeyEntities from the vulnerability description provided using only the words and phrases that are part of the KeyEntities.
2. Do not add any words or phrases that are not part of the KeyEntities.
3. Provide your answer in the following format:
    [WEAKNESS] <The identified weakness, not the technical impact. This should not include product details, only the root cause weakness>
    [PRODUCT] <The affected product>
    [COMPONENT] <The affected component>
    [VERSION] <The product version>
    [ATTACKER] <Type of attacker>
    [IMPACT] <Potential impact of the vulnerability>
    [VECTOR] <Attack vector>
    [ROOTCAUSE] <The rootcause weakness that led to the vulnerability. This should not include product details, only the root cause weakness>
4. Do not provide any additional text or comments or analysis.
5. Double check your work to ensure that the identified weakness is a true weakness and not just a technical impact.


# EXAMPLES

### Example 1 based on CVE-2024-21254
===Description===

Insecure Direct Object Reference (IDOR) in MyVendor MyProduct 10.1 to 10.6 allows an unauthenticated attacker to read sensitive data and execute specific commands and functions with full admin rights via the page parameter to the /api/xyz API endpoint.

````
===KeyEntities===
[COMPONENT] 
[VENDOR] MyVendor
[WEAKNESS] Insecure Direct Object Reference (IDOR)
[PRODUCT] MyProduct
[VERSION] 10.1 to 10.6
[ATTACKER] unauthenticated attacker
[IMPACT] read sensitive data and execute specific commands and functions with full admin rights
[VECTOR] the page parameter to the /api/xyz API endpoint
[ROOTCAUSE] Insecure Direct Object Reference (IDOR)
````
### Example 2 CVE-2019-3396 

===template===

[COMPONENT] in [VENDOR] [PRODUCT] [VERSION] allows [ATTACKER] to [IMPACT] via [VECTOR].

===Description===

The Widget Connector macro in Atlassian Confluence Server before version 6.6.12 (the fixed version for 6.6.x), from version 6.7.0 before 6.12.3 (the fixed version for 6.12.x), from version 6.13.0 before 6.13.3 (the fixed version for 6.13.x), and from version 6.14.0 before 6.14.2 (the fixed version for 6.14.x), allows remote attackers to achieve path traversal and remote code execution on a Confluence Server or Data Center instance via server-side template injection.

````
===KeyEntities===
[COMPONENT] Widget Connector macro
[VENDOR] Atlassian
[PRODUCT] Confluence Server
[VERSION] before version 6.6.12 (the fixed version for 6.6.x), from version 6.7.0 before 6.12.3 (the fixed version for 6.12.x), from version 6.13.0 before 6.13.3 (the fixed version for 6.13.x), and from version 6.14.0 before 6.14.2 (the fixed version for 6.14.x)
[ATTACKER] remote attackers
[IMPACT] achieve path traversal and remote code execution
[VECTOR] server-side template injection.
[ROOTCAUSE] 
[WEAKNESS] 
````

### Example 3 based on CVE-2020-3118
===template===

[COMPONENT] in [VENDOR] [PRODUCT] [VERSION] [ROOTCAUSE] allows [ATTACKER] to [IMPACT] via [VECTOR].

===Description===

The Cisco Discovery Protocol implementation in Cisco IOS XR Software does not do improper validation of string input from certain fields which could allow an unauthenticated, adjacent attacker to execute arbitrary code or cause a reload on an affected device. The vulnerability is due to improper validation of string input from certain fields in Cisco Discovery Protocol messages. An attacker could exploit this vulnerability by sending a malicious Cisco Discovery Protocol packet to an affected device. 

````
===KeyEntities===
[COMPONENT] Cisco Discovery Protocol implementation
[VENDOR] Cisco
[PRODUCT] IOS XR Software
[VERSION] 
[ROOTCAUSE] improper validation of string input
[WEAKNESS] 
[ATTACKER] unauthenticated, adjacent attacker
[IMPACT] execute arbitrary code or cause a reload
[VECTOR] malicious Cisco Discovery Protocol packet
````

### Example 4 based on CVE-2021-4206

===Description===
A flaw was found in the QXL display device emulation in QEMU v1.2. An integer overflow in the cursor_alloc() function can lead to the allocation of a small cursor object followed by a subsequent heap-based buffer overflow. This flaw allows a malicious privileged guest user to crash the QEMU process on the host or potentially execute arbitrary code within the context of the QEMU process."

````
===KeyEntities===
[COMPONENT] QXL display device emulation
[PRODUCT] QEMU
[VERSION] v1.2
[ROOTCAUSE] integer overflow
[WEAKNESS] heap-based buffer overflow
[ATTACKER] a malicious privileged guest user
[IMPACT] crash the QEMU process on the host or potentially execute arbitrary code within the context of the QEMU process
[VECTOR] the cursor_alloc() function
````
"""



In [39]:
df

Unnamed: 0,CVE,CWE,Description,Chains,Weakness_Description,Analysis
0,CVE-2022-25258,CWE-476,An issue was discovered in drivers/usb/gadget/...,,lacks certain validation of interface OS descr...,Description 1:\n[COMPONENT] USB Gadget subsyst...
1,CVE-2022-25291,CWE-190,An integer overflow in WatchGuard Firebox and ...,CWE-190->CWE-122,An integer overflow ... trigger a heap-based b...,
2,CVE-2022-25308,CWE-121,A stack-based buffer overflow flaw was found i...,,A stack-based buffer overflow flaw,Description 1:\n[WEAKNESS] stack-based buffer ...
3,CVE-2022-25427,CWE-121,Tenda AC9 v15.03.2.21 was discovered to contai...,,contain a stack overflow,
4,CVE-2022-25428,CWE-121,Tenda AC9 v15.03.2.21 was discovered to contai...,,contain a stack overflow,Description 1:\n[COMPONENT] saveparentcontroli...
...,...,...,...,...,...,...
6240,CVE-2022-29897,CWE-20,On various RAD-ISM-900-EN-* devices by PHOENIX...,,due to an improper input validation,
6241,CVE-2022-29922,CWE-20,Improper Input Validation vulnerability in the...,,Improper Input Validation,
6242,CVE-2022-3001,CWE-20,This vulnerability exists in Milesight Video M...,,improper input handling,
6243,CVE-2022-30232,CWE-20,A CWE-20: Improper Input Validation vulnerabil...,,CWE-20: Improper Input Validation,


In [40]:
# Function to process a single description
def process_single(description):
    human_prompt = f"Here is the vulnerability description to analyze:\n\n{description}\n\nPlease analyze this description and provide the extracted KeyEntities as specified."
    
    response = anthropic.messages.create(
        model="claude-3-5-sonnet-20240620",
        max_tokens=2000,
        temperature=0,
        system=system_prompt,
        messages=[
            {"role": "user", "content": human_prompt}
        ]
    )
    
    return response.content

# Function to save DataFrame to CSV
def save_dataframe(df, filename):
    df.to_csv(filename, index=False)
    print(f"Progress saved to {filename}")

# Process the DataFrame one entry at a time
total_rows = len(df)
results = []

for index, row in df.iterrows():
    description = row['Description']
    
    try:
        result = process_single(description)
        results.append(result)
        df.at[index, 'Analysis'] = result
    except Exception as e:
        print(f"Error processing entry {index}: {str(e)}")
        results.append(None)
        df.at[index, 'Analysis'] = None
    
    # Save progress every 100 entries
    if (index + 1) % 100 == 0:
        save_dataframe(df, f'analyzed_vulnerabilities_progress_{index+1}.csv')
        print(f"Processed {index+1}/{total_rows} entries")
    
    # Simple rate limiting (adjust as needed)
    time.sleep(1)

# Ensure all results are added to the DataFrame
df['Analysis'] = results

# Save the final updated DataFrame
final_filename = 'analyzed_vulnerabilities_final.csv'
save_dataframe(df, final_filename)

print(f"Analysis complete. Final results saved to '{final_filename}'.")







KeyboardInterrupt: 

In [38]:
# Function to process a batch of descriptions
def process_batch(descriptions):
    human_prompt = "Here are the vulnerability descriptions to analyze:\n\n"
    for i, desc in enumerate(descriptions, 1):
        human_prompt += f"Description {i}:\n{desc}\n\n"
    human_prompt += "Please analyze each description and provide the extracted KeyEntities as specified."
    
    response = anthropic.messages.create(
        model="claude-3-sonnet-20240229",
        max_tokens=2000,
        temperature=0,
        system=system_prompt,
        messages=[
            {"role": "user", "content": human_prompt}
        ]
    )
    
    return response.content

# Function to parse batch results
def parse_batch_results(batch_results):
    if isinstance(batch_results, str):
        # If it's a string, split it as before
        individual_results = batch_results.split('Description')[1:]
        return [result.strip() for result in individual_results]
    elif isinstance(batch_results, list):
        # If it's a list, assume each item is a TextBlock
        return [block.text for block in batch_results]
    else:
        # If it's neither, raise an error
        raise ValueError(f"Unexpected type for batch_results: {type(batch_results)}")

# Function to save DataFrame to CSV
def save_dataframe(df, filename):
    df.to_csv(filename, index=False)
    print(f"Progress saved to {filename}")

# Process the DataFrame in batches
batch_size = 1
total_rows = len(df)
results = []

for start_idx in range(0, total_rows, batch_size):
    end_idx = min(start_idx + batch_size, total_rows)
    batch = df['Description'].iloc[start_idx:end_idx].tolist()
    
    batch_results = process_batch(batch)
    individual_results = parse_batch_results(batch_results)
    
    # Ensure we have the same number of results as the batch size
    if len(individual_results) != len(batch):
        print(f"Warning: Mismatch in results. Expected {len(batch)}, got {len(individual_results)}")
        # Pad with None if we have fewer results than expected
        individual_results.extend([None] * (len(batch) - len(individual_results)))
        # Truncate if we have more results than expected
        individual_results = individual_results[:len(batch)]
    
    results.extend(individual_results)
    
    # Save progress after each batch
    df.loc[start_idx:end_idx-1, 'Analysis'] = individual_results
    save_dataframe(df, f'analyzed_vulnerabilities_progress_{end_idx}.csv')
    print(f"Processed {end_idx}/{total_rows} entries")
    
    # Simple rate limiting (adjust as needed)
    time.sleep(2)

# Ensure all results are added to the DataFrame
df['Analysis'] = results

# Save the final updated DataFrame
final_filename = 'analyzed_vulnerabilities_final.csv'
save_dataframe(df, final_filename)

print(f"Analysis complete. Final results saved to '{final_filename}'.")



KeyboardInterrupt: 

In [32]:
# Function to process a batch of descriptions
def process_batch(descriptions):
    human_prompt = "Here are the vulnerability descriptions to analyze:\n\n"
    for i, desc in enumerate(descriptions, 1):
        human_prompt += f"Description {i}:\n{desc}\n\n"
    human_prompt += "Please analyze each description and provide the extracted KeyEntities as specified."
    
    response = anthropic.messages.create(
        model="claude-3-5-sonnet-20240620",
        max_tokens=2000,
        temperature=0,
        system=system_prompt,
        messages=[
            {"role": "user", "content": human_prompt}
        ]
    )
    
    return response.content

# Function to parse batch results
def parse_batch_results(batch_results):
    individual_results = batch_results.split('Description')[1:]
    return [result.strip() for result in individual_results]

# Function to save DataFrame to CSV
def save_dataframe(df, filename):
    df.to_csv(filename, index=False)
    print(f"Progress saved to {filename}")

# Process the DataFrame in batches
batch_size = 25
total_rows = len(df)
results = []

for start_idx in range(0, total_rows, batch_size):
    end_idx = min(start_idx + batch_size, total_rows)
    batch = df['Description'].iloc[start_idx:end_idx].tolist()
    
    batch_results = process_batch(batch)
    individual_results = parse_batch_results(batch_results)
    results.extend(individual_results)
    
    # Save progress after each batch
    df.loc[start_idx:end_idx-1, 'Analysis'] = individual_results
    save_dataframe(df, f'analyzed_vulnerabilities_progress_{end_idx}.csv')
    print(f"Processed {end_idx}/{total_rows} entries")
    
    # Simple rate limiting (adjust as needed)
    time.sleep(2)

# Ensure all results are added to the DataFrame
df['Analysis'] = results

# Save the final updated DataFrame
final_filename = 'analyzed_vulnerabilities_final.csv'
save_dataframe(df, final_filename)

print(f"Analysis complete. Final results saved to '{final_filename}'.")



AttributeError: 'list' object has no attribute 'split'

In [None]:
# Function to process a batch of descriptions
def process_batch(descriptions):
    human_prompt = "Here are the vulnerability descriptions to analyze:\n\n"
    for i, desc in enumerate(descriptions, 1):
        human_prompt += f"Description {i}:\n{desc}\n\n"
    human_prompt += "Please analyze each description and provide the extracted KeyEntities as specified."
    
    response = anthropic.messages.create(
        model="claude-3-sonnet-20240229",
        max_tokens=2000,
        temperature=0,
        system=system_prompt,
        messages=[
            {"role": "user", "content": human_prompt}
        ]
    )
    
    return response.content

# Function to parse batch results
def parse_batch_results(batch_results):
    if isinstance(batch_results, str):
        # If it's a string, split it as before
        individual_results = batch_results.split('Description')[1:]
        return [result.strip() for result in individual_results]
    elif isinstance(batch_results, list):
        # If it's a list, assume each item is a TextBlock
        return [block.text for block in batch_results]
    else:
        # If it's neither, raise an error
        raise ValueError(f"Unexpected type for batch_results: {type(batch_results)}")

# Function to save DataFrame to CSV
def save_dataframe(df, filename):
    df.to_csv(filename, index=False)
    print(f"Progress saved to {filename}")

# Process the DataFrame in batches
batch_size = 25
total_rows = len(df)
results = []

for start_idx in range(0, total_rows, batch_size):
    end_idx = min(start_idx + batch_size, total_rows)
    batch = df['Description'].iloc[start_idx:end_idx].tolist()
    
    batch_results = process_batch(batch)
    individual_results = parse_batch_results(batch_results)
    results.extend(individual_results)
    
    # Save progress after each batch
    df.loc[start_idx:end_idx-1, 'Analysis'] = individual_results
    save_dataframe(df, f'analyzed_vulnerabilities_progress_{end_idx}.csv')
    print(f"Processed {end_idx}/{total_rows} entries")
    
    # Simple rate limiting (adjust as needed)
    time.sleep(2)

# Ensure all results are added to the DataFrame
df['Analysis'] = results

# Save the final updated DataFrame
final_filename = 'analyzed_vulnerabilities_final.csv'
save_dataframe(df, final_filename)

print(f"Analysis complete. Final results saved to '{final_filename}'.")

In [26]:
model="claude-3-5-sonnet-20240620",


# Function to process a batch of descriptions
def process_batch(descriptions):
    human_prompt = "Here are the vulnerability descriptions to analyze:\n\n"
    for i, desc in enumerate(descriptions, 1):
        human_prompt += f"Description {i}:\n{desc}\n\n"
    human_prompt += "Please analyze each description and provide the extracted KeyEntities as specified."
    
    response = anthropic.messages.create(
        model="claude-3-5-sonnet-20240620",
        max_tokens=2000,
        temperature=0,
        system=system_prompt,
        messages=[
            {"role": "user", "content": human_prompt}
        ]
    )
    
    return response.content

# Function to save DataFrame to CSV
def save_dataframe(df, filename):
    df.to_csv(filename, index=False)
    print(f"Progress saved to {filename}")

# Process the DataFrame in batches
batch_size = 25
total_rows = len(df)
results = []

for start_idx in range(0, total_rows, batch_size):
    end_idx = min(start_idx + batch_size, total_rows)
    batch = df['Description'].iloc[start_idx:end_idx].tolist()
    
    batch_results = process_batch(batch)
    individual_results = batch_results.split('Description')[1:]  # Split the results for each description
    results.extend(individual_results)
    
    # Save progress after each batch
    df.loc[start_idx:end_idx-1, 'Analysis'] = results[-len(batch):]
    save_dataframe(df, f'analyzed_vulnerabilities_progress_{end_idx}.csv')
    print(f"Processed {end_idx}/{total_rows} entries")
    
    # Simple rate limiting (adjust as needed)
    time.sleep(2)

# Ensure all results are added to the DataFrame
df['Analysis'] = results

# Save the final updated DataFrame
final_filename = 'analyzed_vulnerabilities_final.csv'
save_dataframe(df, final_filename)

print(f"Analysis complete. Final results saved to '{final_filename}'.")

for start_idx in range(0, total_rows, batch_size):
    end_idx = min(start_idx + batch_size, total_rows)
    batch = df['Description'].iloc[start_idx:end_idx].tolist()
    
    batch_results = process_batch(batch)
    individual_results = batch_results.split('Description')[1:]  # Split the results for each description
    results.extend(individual_results)
    
    # Save progress after each batch
    df.loc[start_idx:end_idx-1, 'Analysis'] = results[-len(batch):]
    save_dataframe(df, f'analyzed_vulnerabilities_progress_{end_idx}.csv')
    print(f"Processed {end_idx}/{total_rows} entries")
    
    # Simple rate limiting (adjust as needed)
    time.sleep(2)

# Ensure all results are added to the DataFrame
df['Analysis'] = results

# Save the final updated DataFrame
final_filename = 'analyzed_vulnerabilities_final.csv'
save_dataframe(df, final_filename)

print(f"Analysis complete. Final results saved to '{final_filename}'.")

AttributeError: 'list' object has no attribute 'split'

In [31]:
batch_results

[TextBlock(text='[COMPONENT] USB Gadget subsystem\n[PRODUCT] Linux kernel\n[VERSION] before 5.16.10\n[WEAKNESS] lack of validation\n[ROOTCAUSE] lack of validation of interface OS descriptor requests\n[IMPACT] Memory corruption\n[VECTOR] interface OS descriptor requests\n\n[WEAKNESS] integer overflow\n[PRODUCT] WatchGuard Firebox and XTM appliances\n[VERSION] Fireware OS before 12.7.2_U2, 12.x before 12.1.3_U8, and 12.2.x through 12.5.x before 12.5.9_U2\n[ATTACKER] authenticated remote attacker\n[IMPACT] trigger a heap-based buffer overflow and potentially execute arbitrary code\n[VECTOR] initiating a firmware update with a malicious upgrade image\n[ROOTCAUSE] integer overflow\n\n[WEAKNESS] stack-based buffer overflow\n[PRODUCT] Fribidi package\n[IMPACT] memory leak or a denial of service\n[VECTOR] specially crafted file\n[ROOTCAUSE] stack-based buffer overflow\n\n[WEAKNESS] stack overflow\n[PRODUCT] Tenda AC9\n[VERSION] v15.03.2.21\n[COMPONENT] openSchedWifi function\n[VECTOR] schedend