# Run CTIBench on Furiosa RNGD 
CTIBench is a benchmark for evaluating LLMs in Cyber Threat Intelligence (CTI)

This Notebook is derived from https://github.com/xashru/cti-bench/blob/main/evaluation/model-prediction.ipynb (which runs them on a bunch of other models)

Benchmark details https://proceedings.neurips.cc/paper_files/paper/2024/file/5acd3c628aa1819fbf07c39ef73e7285-Paper-Datasets_and_Benchmarks_Track.pdf

The benchmarks evaluates threats in four categories:
- CTI-MCQ: Cyber Threat Intelligence Multiple Choice Questions - test the knowledge of LLMs in cybersecurity
- CTI-RCM: Cyber Threat Intelligence Root Cause Mapping (RCM)
- CTI-VSP: Cyber Threat Intelligence Vulnerability Severity Prediction
- CTI-TAA: Cyber Threat Intelligence Threat Actor Attribution (TAA)

In [1]:
# Ensure required packages are available before imports
import sys, subprocess

def ensure_package(pkg: str):
    try:
        __import__(pkg)
    except ImportError:
        subprocess.check_call([sys.executable, "-m", "pip", "install", pkg])

# Core dependencies for this notebook
for pkg in [
    "openai",
    "python-dotenv",
    "pandas",
    "numpy",
    "matplotlib",
]:
    ensure_package(pkg)




In [2]:
from openai import OpenAI
import pandas as pd
import numpy as np
import os
import time
import re
import matplotlib.pyplot as plt

## Setup all APIs

In [3]:
import os
from dotenv import load_dotenv, find_dotenv

# Load .env from current working directory or nearest parent
load_dotenv(find_dotenv(usecwd=True))

True

In [4]:
# we allow the user to specify the model to use, and we will use the model_mapping to map the model name to the model id
model_name = 'rngd'
samples_to_run = 100

if model_name == 'gpt4':
    openai_client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
elif model_name == 'rngd':
    openai_client = OpenAI(api_key="EMPTY", base_url="http://localhost:8080/v1")
else:
    raise ValueError(f"Model {model_name} not supported")

model_mapping = {
    'rngd': 'EMPTY',
    'gpt4': 'gpt-4-turbo'
}

## Prediction Params & Method

In [5]:
# set parameters for more deterministic output
temperature = 0
top_p = 1
seed = 42
max_tokens = 2048

In [6]:
sys_prompt = 'You are a cybersecurity expert specializing in cyberthreat intelligence.'

In [7]:
def get_single_prediction(question, model_name):
    
    model = model_mapping[model_name]
    response = openai_client.chat.completions.create(
        model=model,
        messages=[
            {'role': 'system', 'content': sys_prompt},
            {'role': 'user', 'content': question}
        ],
        temperature=temperature,
        top_p=top_p,
        max_tokens=max_tokens,
        seed=seed
    )
    output = response.choices[0].message.content
    time.sleep(1)   # so it doesn't throw error
    return output


#### Test

In [8]:
question = (
    "Analyze the following CVE description and map it to the appropriate CWE. "
    "Provide a brief justification. The last line of your answer should only contain the CWE ID.\n\n"
    "CVE Description:\n\n"
    "Dell EMC CloudLink 7.1 and all prior versions contain an Improper Input Validation Vulnerability. "
    "A remote low privileged attacker may potentially exploit this vulnerability, "
    "leading to execution of arbitrary files on the server."
)

##### Are all the APIs working?

In [9]:
print(get_single_prediction(question, model_name))

The CVE description indicates that the vulnerability arises from improper input validation, which allows an attacker to execute arbitrary files on the server. This type of vulnerability is typically associated with the following Common Weakness Enumeration (CWE):

CWE-20: Improper Input Validation

This CWE ID is the most appropriate mapping for the given CVE description.


# Run Evaluation for a Dataset

### All formatting comes here
While these captures most output format of the LLMs we studied, we still had to manually collect some responses from the generated response file

In [10]:
def format_rcm(text):
    # Define the regex pattern for CWE ID
    cwe_pattern = r'CWE-\d+'

    # Find all matches in the text
    matches = re.findall(cwe_pattern, text)

    # Return the last match if any match is found, otherwise return the original text
    if matches:
        return matches[-1], True
    else:
        return text, False

def format_vsp(text):
    # Define the regex pattern for CVSS v3.1 vector string
    #cvss_pattern = r'AV:[^/]+?/AC:[^/]+?/PR:[^/]+?/UI:[^/]+?/S:[^/]+?/C:[^/]+?/I:[^/]+?/A:[^/]+?'
    cvss_pattern = r'AV:[A-Za-z]+/AC:[A-Za-z]+/PR:[A-Za-z]+/UI:[A-Za-z]+/S:[A-Za-z]+/C:[A-Za-z]+/I:[A-Za-z]+/A:[A-Za-z]+'


    # Find all matches in the text
    matches = re.findall(cvss_pattern, text)

    # Return the last match if any match is found, otherwise return the original text
    if matches:
        return matches[-1], True
    else:
        return text, False

def format_mcq(text):
    last_line = text.split('\n')[-1].rstrip()
    if last_line.startswith('A)') or last_line.startswith('B)') or last_line.startswith('C)') or last_line.startswith('D)'):
        return last_line[0]
    if last_line.endswith('A') or last_line.endswith('B') or last_line.endswith('C') or last_line.endswith('D'):
        return last_line[-1]
    if last_line.endswith('**'):
        return last_line[-3]
    if len(last_line) == 0:
        last_line = text.split('\n')[-2].rstrip()
        if last_line.startswith('A)') or last_line.startswith('B)') or last_line.startswith('C)') or last_line.startswith('D)'):
            return last_line[0]
        if last_line.endswith('A') or last_line.endswith('B') or last_line.endswith('C') or last_line.endswith('D'):
            return last_line[-1]
        if last_line.endswith('**'):
            return last_line[-3]
    return ' '.join(text.split('\n'))

def format_taa(text):
    # need to manually extract the attribution
    return ' '.join(text.split('\n'))

In [11]:
# extend this to pick a few samples or all samples

def run_evaluation(file_path, task, model_name):
    # Keep track of time and total #chars generated
    start_time = time.time()
    count_chars = 0
    instructions_failed = 0
    
    data = pd.read_csv(file_path, encoding='utf-8', sep='\t')

    # response contain the entire response, result the formatted result
    all_responses = []
    all_results = []
    
    for index, row in data.iterrows():
        prompt = row['Prompt']
        try:
            output = get_single_prediction(prompt, model_name)
            
            count_chars += len(output)
            all_responses.append(output)
            if task == 'rcm':
                answer, success = format_rcm(output)
                if not success:
                    instructions_failed += 1
            elif task == 'vsp':
                answer, success = format_vsp(output)
                if not success:
                    instructions_failed += 1      
            elif task == 'mcq':
                answer = format_mcq(output)
            elif task == 'taa':
                answer = format_taa(output)
            else:
                raise ValueError('Task unknown!')
        except Exception as e:
            answer = 'Error'
            all_responses.append(answer)
            print('Exception at row ', index+1)
            print(e)
        all_results.append(answer)
        print(index+1, answer)
        # print(index+1)


    time_taken = time.time() - start_time
    print('Time taken:', time_taken)
    print('#Characters generated:', count_chars)
    print('#Instructions failed:', instructions_failed)

    # Save all the responses & results
    out_response = file_path.split('.')[0] + '_' + model_name + '_response.txt'
    out_result = file_path.split('.')[0] + '_' + model_name + '_result.txt'

    with open(out_response, 'w', encoding='utf-8') as f:
        out_str = ''
        for i in range(len(all_responses)):
            out_str += '#####' + str(i+1) + '#####\n'
            out_str += all_responses[i]
            out_str += '\n\n'
        f.write(out_str)
    with open(out_result, 'w', encoding='utf-8') as f:
        f.write('\n'.join(all_results))

    print('------- Done --------')

In [12]:
def run_evaluation(file_path, task, model_name, num_samples=None):
    """Run evaluation over the dataset.

    Args:
        file_path: Path to TSV dataset with a 'Prompt' column.
        task: One of {'rcm', 'vsp', 'mcq', 'taa'} controlling output formatting.
        model_name: Logical model key used by get_single_prediction.
        num_samples: Optional int; if provided and >0, only process the first N rows.
            If None or invalid/non-positive, process all rows (default behavior).
    """
    # Keep track of time and total #chars generated
    start_time = time.time()
    count_chars = 0
    instructions_failed = 0

    data = pd.read_csv(file_path, encoding='utf-8', sep='\t')

    # Determine subset
    n = None
    if num_samples is not None:
        try:
            n = int(num_samples)
        except (TypeError, ValueError):
            n = None
    if n is not None and n > 0:
        sample_data = data.head(n).reset_index(drop=True)
    else:
        sample_data = data.reset_index(drop=True)

    # response contain the entire response, result the formatted result
    all_responses = []
    all_results = []

    for index, row in sample_data.iterrows():
        prompt = row['Prompt']
        try:
            output = get_single_prediction(prompt, model_name)

            count_chars += len(output)
            all_responses.append(output)
            if task == 'rcm':
                answer, success = format_rcm(output)
                if not success:
                    instructions_failed += 1
            elif task == 'vsp':
                answer, success = format_vsp(output)
                if not success:
                    instructions_failed += 1
            elif task == 'mcq':
                answer = format_mcq(output)
            elif task == 'taa':
                answer = format_taa(output)
            else:
                raise ValueError('Task unknown!')
        except Exception as e:
            answer = 'Error'
            all_responses.append(answer)
            print('Exception at row ', index + 1)
            print(e)
        all_results.append(answer)
        print(index + 1, answer)

    time_taken = time.time() - start_time
    print('Time taken:', time_taken)
    print('#Characters generated:', count_chars)
    print('#Instructions failed:', instructions_failed)

    # Save all the responses & results
    out_response = file_path.split('.')[0] + '_' + model_name + '_response.txt'
    out_result = file_path.split('.')[0] + '_' + model_name + '_result.txt'

    with open(out_response, 'w', encoding='utf-8') as f:
        out_str = ''
        for i in range(len(all_responses)):
            out_str += '#####' + str(i + 1) + '#####\n'
            out_str += all_responses[i]
            out_str += '\n\n'
        f.write(out_str)
    with open(out_result, 'w', encoding='utf-8') as f:
        f.write('\n'.join(all_results))

    print('------- Done --------')


In [None]:
# CTI-MCQ: Cyber Threat Intelligence Multiple Choice Questions - test the knowledge of LLMs in cybersecurity
run_evaluation('../data/cti-mcq.tsv', 'mcq', model_name, num_samples=samples_to_run)

1 B
2 C
3 C
4 A
5 C
6 B
7 D
8 A
9 D
10 D
11 D
12 D
13 A
14 B
15 D
16 B
17 A
18 B
19 A
20 D
21 D
22 A
23 B
24 C
25 D
26 D
27 A
28 C
29 B
30 A
31 A
32 D
33 A
34 D
35 C
36 D
37 D
38 B
39 B
40 C
41 C
42 D
43 D
44 D
45 C
46 C
47 D
48 B
49 D
50 D
51 B
52 B
53 C
54 D
55 D
56 A
57 B
58 B
59 C
60 C
61 A
62 C
63 B
64 A
65 B
66 D
67 C
68 D
69 D
70 A
71 B
72 A
73 B
74 D
75 B
76 D
77 B
78 B
79 A
80 B
81 B
82 B
83 A
84 A
85 D
86 D
87 D
88 B
89 C
90 C
91 B
92 D
93 C
94 A
95 A
96 B
97 A
98 B
99 C
100 D
Time taken: 256.02747631073
#Characters generated: 100
#Instructions failed: 0
------- Done --------


In [None]:
# CTI-RCM: Cyber Threat Intelligence Root Cause Mapping (RCM)
# RCM identifies the underlying cause(s) of a vulnerability by correlating CVE records and bug tickets with CWE entries
run_evaluation('../data/cti-rcm.tsv', 'rcm', model_name, num_samples=samples_to_run)

1 CWE-416
2 CWE-287
3 CWE-79
4 CWE-787
5 CWE-79
6 CWE-79
7 CWE-287
8 CWE ID: 918
9 CWE-79
10 CWE-532
11 CWE-79
12 CWE-122
13 CWE-416
14 CWE-78
15 CWE-306
16 CWE-200
17 CWE-434
18 CWE-89
19 CWE-434
20 CWE-79
21 CWE-78
22 CWE-89
23 The CVE description indicates a vulnerability in the rendering of user comments in the flaskBlog application. The vulnerability allows for the execution of arbitrary JavaScript code due to improper storage and rendering of the `/user/<user>` page.

The root cause of this vulnerability is the use of the `|safe` filter in the Jinja2 template engine. This filter tells Flask to not escape the rendered content, which in this case leads to the execution of arbitrary JavaScript code.

To remediate this vulnerability, the `|safe` filter should be removed from the HTML template `user.html`. This change will ensure that any user input is properly escaped and cannot be used to execute arbitrary JavaScript code.

No official fix is available from the flaskBlog maintainers

In [None]:
# CTI-VSP: Cyber Threat Intelligence Vulnerability Severity Prediction
run_evaluation('../data/cti-vsp.tsv', 'vsp', model_name, num_samples=samples_to_run)

1 AV:N/AC:L/PR:N/UI:N/S:C/C:H/I:H/A:H
2 To calculate the CVSS v3.1 Base Score for the given CVE description, we need to analyze the provided information and assign values to each of the base metrics.

**CVE Description Analysis:**

- **Attack Vector (AV)**: The vulnerability can be exploited remotely, as the attacker can gain unauthorized access to other OpenPages accounts. This corresponds to **Network (N)**.
  
- **Attack Complexity (AC)**: The attack complexity is likely to be low, as the attacker only needs to exploit the vulnerability in the OpenPages environment using Native authentication. This corresponds to **Low (L)**.

- **Privileges Required (PR)**: The vulnerability requires no special privileges to exploit, as an attacker with access to the OpenPages database could exploit the weakness. This corresponds to **None (N)**].

- **User Interaction (UI)**: The vulnerability does not require user interaction to be exploited, as the attacker can gain unauthorized access through a

In [17]:
#  CTI-TAA: Cyber Threat Intelligence Threat Actor Attribution (TAA)
run_evaluation('../data/cti-taa.tsv', 'taa', model_name, num_samples=samples_to_run)

1 The threat intelligence team's latest telemetry data. They have a series of cyber-incident incidents. The threat intelligence team's latest telemetry analysis and tasking, and a new parent (categorized threat)team's.
2 You (certainly and in the in the in the TA (categorized and place- and the in- and threat, and the following in-protocol (in the following-when-likely-branch 43-CT-PLACETTA-CT-APT-protocol. The following in-protocol that has-APT-PT-APT- and-identified (in-technical  2020- 1- 0-technical 2 (injected- 2020- 1-embedded and 0-technical, with a DCP (in-protocol)network 2020.
3 C we we we we weark (C-internal we are the light we, a place we, a place we are we (C-unknown (C: a-fsigh (C.Industry we, we we we we we we, we we we, we, taking we, we, we we. The place we, we we we we we we. The internal we. The place the we.
4 The registry key "\BaseNamedObjects\F932B6C7–3A20–46A0-B8A0–8894AA421973" appears to be a part of the Windows Registry, which is a central database for all c